diff --git a/agent/agent_test.go b/agent/agent_test.go index 9570bec08bb8..67a6adf3365f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -18,6 +18,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "testing" "time" @@ -3729,10 +3730,27 @@ func TestAgent_SecurityChecks(t *testing.T) { defer a.Shutdown() data := make([]byte, 0, 8192) - bytesBuffer := bytes.NewBuffer(data) - a.LogOutput = bytesBuffer + buf := &syncBuffer{b: bytes.NewBuffer(data)} + a.LogOutput = buf assert.NoError(t, a.Start(t)) - assert.Contains(t, bytesBuffer.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS") + assert.Contains(t, buf.String(), "using enable-script-checks without ACLs and without allow_write_http_from is DANGEROUS") +} + +type syncBuffer struct { + lock sync.RWMutex + b *bytes.Buffer +} + +func (b *syncBuffer) Write(data []byte) (int, error) { + b.lock.Lock() + defer b.lock.Unlock() + return b.b.Write(data) +} + +func (b *syncBuffer) String() string { + b.lock.Lock() + defer b.lock.Unlock() + return b.b.String() } func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) { diff --git a/agent/consul/acl_test.go b/agent/consul/acl_test.go index 3cf6b71dc54a..c85e5c2a0926 100644 --- a/agent/consul/acl_test.go +++ b/agent/consul/acl_test.go @@ -310,43 +310,51 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) { func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) { switch policyID { case "acl-ro": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "acl-ro", Name: "acl-ro", Description: "acl-ro", Rules: `acl = "read"`, Syntax: acl.SyntaxCurrent, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil case "acl-wr": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "acl-wr", Name: "acl-wr", Description: "acl-wr", Rules: `acl = "write"`, Syntax: acl.SyntaxCurrent, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil case "service-ro": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "service-ro", Name: "service-ro", Description: "service-ro", Rules: `service_prefix "" { policy = "read" }`, Syntax: acl.SyntaxCurrent, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil case "service-wr": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "service-wr", Name: "service-wr", Description: "service-wr", Rules: `service_prefix "" { policy = "write" }`, Syntax: acl.SyntaxCurrent, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil case "node-wr": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "node-wr", Name: "node-wr", Description: "node-wr", @@ -354,9 +362,11 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) { Syntax: acl.SyntaxCurrent, Datacenters: []string{"dc1"}, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil case "dc2-key-wr": - return true, &structs.ACLPolicy{ + p := &structs.ACLPolicy{ ID: "dc2-key-wr", Name: "dc2-key-wr", Description: "dc2-key-wr", @@ -364,7 +374,9 @@ func testPolicyForID(policyID string) (bool, *structs.ACLPolicy, error) { Syntax: acl.SyntaxCurrent, Datacenters: []string{"dc2"}, RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, - }, nil + } + p.SetHash(false) + return true, p, nil default: return testPolicyForIDEnterprise(policyID) } diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index 4d713e0316de..712a7efd6c7f 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -373,7 +373,10 @@ func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) { } }) defer os.RemoveAll(dir1) - defer s1.Shutdown() + defer func() { + s1.Shutdown() + s1.leaderRoutineManager.Wait() + }() testrpc.WaitForLeader(t, s1.RPC, "dc1") @@ -482,7 +485,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) { } }) defer os.RemoveAll(dir1) - defer s1.Shutdown() + defer func() { + s1.Shutdown() + s1.leaderRoutineManager.Wait() + }() testrpc.WaitForLeader(t, s1.RPC, "dc1") @@ -493,7 +499,10 @@ func TestLeader_SecondaryCA_IntermediateRenew(t *testing.T) { c.Build = "1.6.0" }) defer os.RemoveAll(dir2) - defer s2.Shutdown() + defer func() { + s2.Shutdown() + s2.leaderRoutineManager.Wait() + }() // Create the WAN link joinWAN(t, s2, s1) diff --git a/agent/http_test.go b/agent/http_test.go index cddded077104..d391af3c11ef 100644 --- a/agent/http_test.go +++ b/agent/http_test.go @@ -20,6 +20,11 @@ import ( "time" "github.com/NYTimes/gziphandler" + cleanhttp "github.com/hashicorp/go-cleanhttp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/http2" + "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" @@ -27,10 +32,6 @@ import ( "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - cleanhttp "github.com/hashicorp/go-cleanhttp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/net/http2" ) func TestHTTPServer_UnixSocket(t *testing.T) { @@ -632,7 +633,7 @@ func TestHTTP_wrap_obfuscateLog(t *testing.T) { } t.Parallel() - buf := new(bytes.Buffer) + buf := &syncBuffer{b: new(bytes.Buffer)} a := StartTestAgent(t, TestAgent{LogOutput: buf}) defer a.Shutdown() diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index 876d9f144019..f6b134593d81 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -8,11 +8,12 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestServiceManager_RegisterService(t *testing.T) { @@ -330,26 +331,27 @@ func TestServiceManager_PersistService_API(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") - // Now register a sidecar proxy via the API. - svc := &structs.NodeService{ - Kind: structs.ServiceKindConnectProxy, - ID: "web-sidecar-proxy", - Service: "web-sidecar-proxy", - Port: 21000, - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "web", - DestinationServiceID: "web", - LocalServiceAddress: "127.0.0.1", - LocalServicePort: 8000, - Upstreams: structs.Upstreams{ - { - DestinationName: "redis", - DestinationNamespace: "default", - LocalBindPort: 5000, + newNodeService := func() *structs.NodeService { + return &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Port: 21000, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8000, + Upstreams: structs.Upstreams{ + { + DestinationName: "redis", + DestinationNamespace: "default", + LocalBindPort: 5000, + }, }, }, - }, - EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + } } expectState := &structs.NodeService{ @@ -385,6 +387,7 @@ func TestServiceManager_PersistService_API(t *testing.T) { EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } + svc := newNodeService() svcID := svc.CompoundServiceID() svcFile := filepath.Join(a.Config.DataDir, servicesDir, svcID.StringHash()) @@ -443,8 +446,15 @@ func TestServiceManager_PersistService_API(t *testing.T) { } // Updates service definition on disk + svc = newNodeService() svc.Proxy.LocalServicePort = 8001 - require.NoError(a.addServiceFromSource(svc, nil, true, "mytoken", ConfigSourceRemote)) + err = a.AddService(AddServiceRequest{ + Service: svc, + persist: true, + token: "mytoken", + Source: ConfigSourceRemote, + }) + require.NoError(err) requireFileIsPresent(t, svcFile) requireFileIsPresent(t, configFile) diff --git a/agent/testagent.go b/agent/testagent.go index 11410f20807c..ee1f7f510f0c 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -53,6 +53,8 @@ type TestAgent struct { Config *config.RuntimeConfig // LogOutput is the sink for the logs. If nil, logs are written to os.Stderr. + // The io.Writer must allow concurrent reads and writes. Note that + // bytes.Buffer is not safe for concurrent reads and writes. LogOutput io.Writer // DataDir may be set to a directory which exists. If is it not set, @@ -343,8 +345,8 @@ func (a *TestAgent) Client() *api.Client { // DNSDisableCompression disables compression for all started DNS servers. func (a *TestAgent) DNSDisableCompression(b bool) { for _, srv := range a.dnsServers { - cfg := srv.config.Load().(*dnsConfig) - cfg.DisableCompression = b + a.config.DNSDisableCompression = b + srv.ReloadConfig(a.config) } } diff --git a/go.mod b/go.mod index eb10c2ad209c..994bb69e38d7 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/hashicorp/raft v1.3.1 github.com/hashicorp/raft-autopilot v0.1.5 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea - github.com/hashicorp/serf v0.9.5 + github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce github.com/imdario/mergo v0.3.6 diff --git a/go.sum b/go.sum index 672adbcf531a..72f97a47e40b 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,9 @@ github.com/hashicorp/raft-autopilot v0.1.5 h1:onEfMH5uHVdXQqtas36zXUHEZxLdsJVu/n github.com/hashicorp/raft-autopilot v0.1.5/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= -github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= +github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 h1:lCZfMBDn/Puwg9VosHMf/9p9jNDYYkbzVjb4jYjVfqU= +github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9/go.mod h1:qapjppkpNXHYTyzx+HqkyWGGkmUxafHjuspm/Bqb2Jc= github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 h1:OKsyxKi2sNmqm1Gv93adf2AID2FOBFdCbbZn9fGtIdg= github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk= github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 h1:e1ok06zGrWJW91rzRroyl5nRNqraaBe4d5hiKcVZuHM= diff --git a/lib/routine/routine.go b/lib/routine/routine.go index e53bbb59a578..48fc2aef4e6f 100644 --- a/lib/routine/routine.go +++ b/lib/routine/routine.go @@ -24,6 +24,10 @@ func (r *routineTracker) running() bool { } } +func (r *routineTracker) wait() { + <-r.stoppedCh +} + type Manager struct { lock sync.RWMutex logger hclog.Logger @@ -131,6 +135,8 @@ func (m *Manager) stopInstance(name string) *routineTracker { return instance } +// StopAll goroutines. Once StopAll is called, it is no longer safe to add no +// goroutines to the Manager. func (m *Manager) StopAll() { m.lock.Lock() defer m.lock.Unlock() @@ -142,7 +148,14 @@ func (m *Manager) StopAll() { m.logger.Debug("stopping routine", "routine", name) routine.cancel() } +} - // just wipe out the entire map - m.routines = make(map[string]*routineTracker) +// Wait for all goroutines to stop after StopAll is called. +func (m *Manager) Wait() { + m.lock.Lock() + defer m.lock.Unlock() + + for _, routine := range m.routines { + routine.wait() + } } diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 8a7c069c2a99..4f01fe0f610c 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -958,7 +958,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) { member.Status = StatusAlive member.leaveTime = time.Time{} - member.Addr = net.IP(n.Addr) + member.Addr = n.Addr member.Port = n.Port member.Tags = s.decodeTags(n.Meta) } @@ -1088,6 +1088,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) { // handleNodeLeaveIntent is called when an intent to leave is received. func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { + state := s.State() // Witness a potentially newer time s.clock.Witness(leaveMsg.LTime) @@ -1108,7 +1109,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { // Refute us leaving if we are in the alive state // Must be done in another goroutine since we have the memberLock - if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive { + if leaveMsg.Node == s.config.NodeName && state == SerfAlive { s.logger.Printf("[DEBUG] serf: Refuting an older leave intent") go s.broadcastJoin(s.clock.Time()) return false @@ -1639,7 +1640,6 @@ func (s *Serf) reconnect() { // Select a random member to try and join idx := rand.Int31n(int32(n)) mem := s.failedMembers[idx] - s.memberLock.RUnlock() // Format the addr addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)} @@ -1649,6 +1649,7 @@ func (s *Serf) reconnect() { if mem.Name != "" { joinAddr = mem.Name + "/" + addr.String() } + s.memberLock.RUnlock() // Attempt to join at the memberlist level s.memberlist.Join([]string{joinAddr}) diff --git a/vendor/modules.txt b/vendor/modules.txt index b17354e9a7ea..68021ab115c9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -489,7 +489,7 @@ github.com/hashicorp/raft github.com/hashicorp/raft-autopilot # github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/hashicorp/raft-boltdb -# github.com/hashicorp/serf v0.9.5 +# github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 github.com/hashicorp/serf/coordinate github.com/hashicorp/serf/serf # github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086