From 9cc9f62cd45a039c1a1b97f962930fbf3ba5f074 Mon Sep 17 00:00:00 2001 From: Andrew Arbogast Date: Thu, 15 Dec 2022 19:12:13 -0600 Subject: [PATCH 1/5] Reduce test flakiness due to resource contention and timing --- proxy/destinations/destinations_test.go | 2 +- server_test.go | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/proxy/destinations/destinations_test.go b/proxy/destinations/destinations_test.go index b0258c723..2c2b5e49b 100644 --- a/proxy/destinations/destinations_test.go +++ b/proxy/destinations/destinations_test.go @@ -83,7 +83,7 @@ func TestAddSingleWithFailure(t *testing.T) { assert.Eventually(t, func() bool { fixture.destinations.Wait() return true - }, 2*time.Millisecond, time.Millisecond) + }, 500*time.Millisecond, time.Millisecond) } func TestAddMultiple(t *testing.T) { diff --git a/server_test.go b/server_test.go index 5f0a63cfe..2a3e433c2 100644 --- a/server_test.go +++ b/server_test.go @@ -520,7 +520,7 @@ func TestTCPConfig(t *testing.T) { } } -func sendTCPMetrics(a *net.TCPAddr, tlsConfig *tls.Config, f *fixture) error { +func sendTCPMetrics(t *testing.T, a *net.TCPAddr, tlsConfig *tls.Config, f *fixture) error { // TODO: attempt to ensure the accept goroutine opens the port before we attempt to connect // connect and send stats in two parts var conn net.Conn @@ -549,9 +549,17 @@ func sendTCPMetrics(a *net.TCPAddr, tlsConfig *tls.Config, f *fixture) error { } // check that the server received the stats; HACK: sleep to ensure workers process before flush - time.Sleep(20 * time.Millisecond) + foundProcessed := assert.Eventually( + t, + func() bool { + return f.server.Workers[0].MetricsProcessedCount() >= 1 + }, + 500 * time.Millisecond, + 2 * time.Millisecond, + "metrics should be processed", + ) - if f.server.Workers[0].MetricsProcessedCount() < 1 { + if !foundProcessed { return fmt.Errorf("metrics were not processed") } @@ -837,7 +845,7 @@ func TestUNIXMetricsSSF(t *testing.T) { t.Log("Writing the first metric") _, err := protocol.WriteSSF(conn, testSpan) - firstCtx, firstCancel := context.WithTimeout(ctx, 20*time.Millisecond) + firstCtx, firstCancel := context.WithTimeout(ctx, 500*time.Millisecond) defer firstCancel() keepFlushing(firstCtx, f.server) if assert.NoError(t, err) { @@ -848,7 +856,7 @@ func TestUNIXMetricsSSF(t *testing.T) { firstCancel() // stop flushing like mad t.Log("Writing the second metric") - secondCtx, secondCancel := context.WithTimeout(ctx, 20*time.Millisecond) + secondCtx, secondCancel := context.WithTimeout(ctx, 500*time.Millisecond) defer secondCancel() _, err = protocol.WriteSSF(conn, testSpan) keepFlushing(secondCtx, f.server) @@ -964,7 +972,7 @@ func TestTCPMetrics(t *testing.T) { // attempt to connect and send stats with each of the client configurations for i, clientConfig := range clientConfigs { expectedSuccess := serverConfig.expectedConnectResults[i] - err := sendTCPMetrics(addr, clientConfig.tlsConfig, f) + err := sendTCPMetrics(t, addr, clientConfig.tlsConfig, f) if err != nil { if expectedSuccess { t.Errorf("server config: '%s' client config: '%s' failed: %s", @@ -987,7 +995,7 @@ func TestTCPMetrics(t *testing.T) { // TestHandleTCPGoroutineTimeout verifies that an idle TCP connection doesn't block forever. func TestHandleTCPGoroutineTimeout(t *testing.T) { - const readTimeout = 30 * time.Millisecond + const readTimeout = 500 * time.Millisecond s := &Server{ logger: logrus.NewEntry(logrus.New()), tcpReadTimeout: readTimeout, From f51ccac4047ea659e3f54589efc8b55018f12b4b Mon Sep 17 00:00:00 2001 From: Andrew Arbogast Date: Thu, 15 Dec 2022 19:22:41 -0600 Subject: [PATCH 2/5] go fmt --- networking.go | 2 +- server_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/networking.go b/networking.go index b65fda05b..a7b93225a 100644 --- a/networking.go +++ b/networking.go @@ -341,7 +341,7 @@ type grpcStatsServer struct { server *Server } -//This is the function that fulfils the ssf server proto +// This is the function that fulfils the ssf server proto func (grpcsrv *grpcStatsServer) SendPacket(ctx context.Context, packet *dogstatsd.DogstatsdPacket) (*dogstatsd.Empty, error) { //We use processMetricPacket instead of handleMetricPacket because process can split the byte array into multiple packets if needed grpcsrv.server.processMetricPacket(len(packet.GetPacketBytes()), packet.GetPacketBytes(), nil, DOGSTATSD_GRPC) diff --git a/server_test.go b/server_test.go index 2a3e433c2..0cd378436 100644 --- a/server_test.go +++ b/server_test.go @@ -554,8 +554,8 @@ func sendTCPMetrics(t *testing.T, a *net.TCPAddr, tlsConfig *tls.Config, f *fixt func() bool { return f.server.Workers[0].MetricsProcessedCount() >= 1 }, - 500 * time.Millisecond, - 2 * time.Millisecond, + 500*time.Millisecond, + 2*time.Millisecond, "metrics should be processed", ) From f5bd32cc37b8ef54159bf741abb4fd8839839533 Mon Sep 17 00:00:00 2001 From: Andrew Arbogast Date: Thu, 15 Dec 2022 19:23:56 -0600 Subject: [PATCH 3/5] Revert "go fmt" This reverts commit f51ccac4047ea659e3f54589efc8b55018f12b4b. --- networking.go | 2 +- server_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/networking.go b/networking.go index a7b93225a..b65fda05b 100644 --- a/networking.go +++ b/networking.go @@ -341,7 +341,7 @@ type grpcStatsServer struct { server *Server } -// This is the function that fulfils the ssf server proto +//This is the function that fulfils the ssf server proto func (grpcsrv *grpcStatsServer) SendPacket(ctx context.Context, packet *dogstatsd.DogstatsdPacket) (*dogstatsd.Empty, error) { //We use processMetricPacket instead of handleMetricPacket because process can split the byte array into multiple packets if needed grpcsrv.server.processMetricPacket(len(packet.GetPacketBytes()), packet.GetPacketBytes(), nil, DOGSTATSD_GRPC) diff --git a/server_test.go b/server_test.go index 0cd378436..2a3e433c2 100644 --- a/server_test.go +++ b/server_test.go @@ -554,8 +554,8 @@ func sendTCPMetrics(t *testing.T, a *net.TCPAddr, tlsConfig *tls.Config, f *fixt func() bool { return f.server.Workers[0].MetricsProcessedCount() >= 1 }, - 500*time.Millisecond, - 2*time.Millisecond, + 500 * time.Millisecond, + 2 * time.Millisecond, "metrics should be processed", ) From 5f861b3311e8308e0205175cd563df40f994ba28 Mon Sep 17 00:00:00 2001 From: Andrew Arbogast Date: Thu, 15 Dec 2022 19:24:08 -0600 Subject: [PATCH 4/5] go fmt --- server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server_test.go b/server_test.go index 2a3e433c2..0cd378436 100644 --- a/server_test.go +++ b/server_test.go @@ -554,8 +554,8 @@ func sendTCPMetrics(t *testing.T, a *net.TCPAddr, tlsConfig *tls.Config, f *fixt func() bool { return f.server.Workers[0].MetricsProcessedCount() >= 1 }, - 500 * time.Millisecond, - 2 * time.Millisecond, + 500*time.Millisecond, + 2*time.Millisecond, "metrics should be processed", ) From 5c811ae5a633e07b51c70e17be93c3ee46d338ff Mon Sep 17 00:00:00 2001 From: Andrew Arbogast Date: Thu, 15 Dec 2022 19:29:17 -0600 Subject: [PATCH 5/5] This is used for expected fail cases, so use a fake testing.T --- server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_test.go b/server_test.go index 0cd378436..b226e2a53 100644 --- a/server_test.go +++ b/server_test.go @@ -550,7 +550,7 @@ func sendTCPMetrics(t *testing.T, a *net.TCPAddr, tlsConfig *tls.Config, f *fixt // check that the server received the stats; HACK: sleep to ensure workers process before flush foundProcessed := assert.Eventually( - t, + &testing.T{}, func() bool { return f.server.Workers[0].MetricsProcessedCount() >= 1 },