Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"strings"
"time"

"github.com/hashicorp/go-metrics/compat"
metrics "github.com/hashicorp/go-metrics/compat"
"github.com/hashicorp/go-multierror"
)

Expand Down Expand Up @@ -140,6 +140,15 @@ type Config struct {
// whether to perform TCP pings on a node-by-node basis.
DisableTcpPingsForNode func(nodeName string) bool

// DisableUdpPings will turn off all UDP pings (both direct and indirect)
// that are attempted during node probing. This forces the use of TCP-only
// communication for failure detection.
DisableUdpPings bool

// DisableUdpPingsForNode is like DisableUdpPings, but lets you control
// whether to perform UDP pings on a node-by-node basis.
DisableUdpPingsForNode func(nodeName string) bool

// AwarenessMaxMultiplier will increase the probe interval if the node
// becomes aware that it might be degraded and not meeting the soft real
// time requirements to reliably probe other nodes.
Expand Down Expand Up @@ -317,6 +326,7 @@ func DefaultLANConfig() *Config {
ProbeTimeout: 500 * time.Millisecond, // Reasonable RTT time for LAN
ProbeInterval: 1 * time.Second, // Failure check every second
DisableTcpPings: false, // TCP pings are safe, even with mixed versions
DisableUdpPings: false, // UDP pings are enabled by default
AwarenessMaxMultiplier: 8, // Probe interval backs off to 8 seconds

GossipNodes: 3, // Gossip to 3 nodes
Expand Down
5 changes: 5 additions & 0 deletions memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
return nil, fmt.Errorf("cannot specify both LogOutput and Logger; please choose a single log configuration setting")
}

// Validate that both TCP and UDP pings are not disabled simultaneously
if conf.DisableTcpPings && conf.DisableUdpPings {
return nil, fmt.Errorf("cannot disable both TCP and UDP pings; at least one ping method must be enabled")
}

logDest := conf.LogOutput
if logDest == nil {
logDest = os.Stderr
Expand Down
79 changes: 52 additions & 27 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,23 @@ func (m *Memberlist) probeNode(node *nodeState) {
deadline := sent.Add(probeInterval)
addr := node.Address()

// Check if UDP pings are disabled for this node
disableUdpPings := m.config.DisableUdpPings ||
(m.config.DisableUdpPingsForNode != nil && m.config.DisableUdpPingsForNode(node.Name))

// Arrange for our self-awareness to get updated.
var awarenessDelta int
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()

if disableUdpPings {
// UDP pings are disabled, skip UDP ping entirely and go directly to
// indirect pings and TCP fallback
m.logger.Printf("[DEBUG] memberlist: UDP pings disabled for node %s, skipping UDP ping", node.Name)
goto HANDLE_REMOTE_FAILURE
}

if node.State == StateAlive {
if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send UDP ping: %s", err)
Expand Down Expand Up @@ -414,37 +426,42 @@ func (m *Memberlist) probeNode(node *nodeState) {
}

HANDLE_REMOTE_FAILURE:
// Get some random live nodes.
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != StateAlive
})
m.nodeLock.RUnlock()

// Attempt an indirect ping.
expectedNacks := 0
selfAddr, selfPort = m.getAdvertise()
ind := indirectPingReq{
SeqNo: ping.SeqNo,
Target: node.Addr,
Port: node.Port,
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
if !disableUdpPings {
// Get some random live nodes for indirect UDP pings.
m.nodeLock.RLock()
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != StateAlive
})
m.nodeLock.RUnlock()

// Attempt an indirect ping.
selfAddr, selfPort = m.getAdvertise()
ind := indirectPingReq{
SeqNo: ping.SeqNo,
Target: node.Addr,
Port: node.Port,
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}

if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect UDP ping: %s", err)
if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect UDP ping: %s", err)
}
}
} else {
m.logger.Printf("[DEBUG] memberlist: UDP pings disabled for node %s, skipping indirect UDP pings", node.Name)
}

// Also make an attempt to contact the node directly over TCP. This
Expand Down Expand Up @@ -493,7 +510,15 @@ HANDLE_REMOTE_FAILURE:
// any additional time here.
for didContact := range fallbackCh {
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s over TCP but UDP probes failed, network may be misconfigured", node.Name)
// Check if UDP pings were disabled for this node
disableUdpPings := m.config.DisableUdpPings ||
(m.config.DisableUdpPingsForNode != nil && m.config.DisableUdpPingsForNode(node.Name))

if disableUdpPings {
m.logger.Printf("[INFO] memberlist: Successfully connected to %s over TCP (UDP pings disabled for this node)", node.Name)
} else {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s over TCP but UDP probes failed, network may be misconfigured", node.Name)
}
return
}
}
Expand Down
125 changes: 125 additions & 0 deletions state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,131 @@ func TestVerifyProtocol(t *testing.T) {
}
}

func TestMemberList_Validation_BothPingsDisabled(t *testing.T) {
c := DefaultLANConfig()
c.Name = "test"
c.DisableTcpPings = true
c.DisableUdpPings = true

_, err := Create(c)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot disable both TCP and UDP pings")
}

func TestMemberList_DisableUdpPings_Functional(t *testing.T) {
net := &MockNetwork{}

// Create memberlist with UDP pings disabled
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = net.NewTransport("node1")
c1.DisableUdpPings = true
c1.ProbeTimeout = 10 * time.Millisecond
c1.ProbeInterval = 50 * time.Millisecond

m1, err := Create(c1)
require.NoError(t, err)
defer func() {
if err := m1.Shutdown(); err != nil {
t.Fatal(err)
}
}()

// Create a second memberlist
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
c2.ProbeTimeout = 10 * time.Millisecond
c2.ProbeInterval = 50 * time.Millisecond

m2, err := Create(c2)
require.NoError(t, err)
defer func() {
if err := m2.Shutdown(); err != nil {
t.Fatal(err)
}
}()

num, err := m2.Join([]string{c1.Name + "/" + c1.Transport.(*MockTransport).addr.String()})
require.NoError(t, err)
require.Equal(t, 1, num)

waitUntilSize(t, m1, 2)
waitUntilSize(t, m2, 2)

// Get the node state for node2 from m1's perspective
m1.nodeLock.RLock()
node2State := m1.nodeMap["node2"]
m1.nodeLock.RUnlock()
require.NotNil(t, node2State)

// Probe node2 - this should skip UDP pings and go directly to indirect/TCP
// Since UDP pings are disabled, it should still work via TCP fallback
m1.probeNode(node2State)

// The node should still be alive (TCP fallback should work)
require.Equal(t, StateAlive, node2State.State)
}

func TestMemberList_DisableUdpPingsForNode_Functional(t *testing.T) {
net := &MockNetwork{}

// Create memberlist with UDP pings disabled for specific node
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = net.NewTransport("node1")
c1.DisableUdpPingsForNode = func(nodeName string) bool {
return nodeName == "node2"
}
c1.ProbeTimeout = 10 * time.Millisecond
c1.ProbeInterval = 50 * time.Millisecond

m1, err := Create(c1)
require.NoError(t, err)
defer func() {
if err := m1.Shutdown(); err != nil {
t.Fatal(err)
}
}()

// Create a second memberlist
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
c2.ProbeTimeout = 10 * time.Millisecond
c2.ProbeInterval = 50 * time.Millisecond

m2, err := Create(c2)
require.NoError(t, err)
defer func() {
if err := m2.Shutdown(); err != nil {
t.Fatal(err)
}
}()

// Join the nodes
num, err := m2.Join([]string{c1.Name + "/" + c1.Transport.(*MockTransport).addr.String()})
require.NoError(t, err)
require.Equal(t, 1, num)

// Wait for them to see each other
waitUntilSize(t, m1, 2)
waitUntilSize(t, m2, 2)

// Get the node state for node2 from m1's perspective
m1.nodeLock.RLock()
node2State := m1.nodeMap["node2"]
m1.nodeLock.RUnlock()
require.NotNil(t, node2State)

// Probe node2 - this should skip UDP pings for node2 specifically
// Since UDP pings are disabled for node2, it should still work via TCP fallback
m1.probeNode(node2State)

// The node should still be alive (TCP fallback should work)
require.Equal(t, StateAlive, node2State.State)
}

func testVerifyProtocolSingle(t *testing.T, A [][6]uint8, B [][6]uint8, expect bool) {
m := GetMemberlist(t, nil)
defer func() {
Expand Down