Skip to content

Commit

Permalink
Merge pull request #232 from hashicorp/dnephin/fix-data-race
Browse files Browse the repository at this point in the history
Fix data races
  • Loading branch information
dnephin committed Mar 24, 2021
2 parents 822dee6 + 7628fe3 commit a43ca4a
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 82 deletions.
17 changes: 2 additions & 15 deletions .circleci/config.yml
Expand Up @@ -4,7 +4,7 @@ version: 2.1
executors:
go:
docker:
- image: docker.mirror.hashicorp.services/circleci/golang:1.12.8
- image: docker.mirror.hashicorp.services/circleci/golang:1.15
environment:
- TEST_RESULTS: /tmp/test-results # path to where test results are saved

Expand All @@ -13,20 +13,8 @@ jobs:
executor: go
steps:
- checkout

# Restore go module cache if there is one
- restore_cache:
keys:
- memberlist-modcache-v1-{{ checksum "go.mod" }}

- run: go mod download

# Save go module cache if the go.mod file has changed
- save_cache:
key: memberlist-modcache-v1-{{ checksum "go.mod" }}
paths:
- "/go/pkg/mod"

# check go fmt output because it does not report non-zero when there are fmt changes
- run:
name: check go fmt
Expand All @@ -52,8 +40,7 @@ jobs:

# run go tests with gotestsum
- run: |
PACKAGE_NAMES=$(go list ./...)
gotestsum --format=short-verbose --junitfile $TEST_RESULTS/gotestsum-report.xml -- $PACKAGE_NAMES
gotestsum --format=short-verbose --junitfile $TEST_RESULTS/gotestsum-report.xml -- -race
- store_test_results:
path: /tmp/test-results
- store_artifacts:
Expand Down
9 changes: 6 additions & 3 deletions event_delegate.go
Expand Up @@ -49,13 +49,16 @@ type NodeEvent struct {
}

func (c *ChannelEventDelegate) NotifyJoin(n *Node) {
c.Ch <- NodeEvent{NodeJoin, n}
node := *n
c.Ch <- NodeEvent{NodeJoin, &node}
}

func (c *ChannelEventDelegate) NotifyLeave(n *Node) {
c.Ch <- NodeEvent{NodeLeave, n}
node := *n
c.Ch <- NodeEvent{NodeLeave, &node}
}

func (c *ChannelEventDelegate) NotifyUpdate(n *Node) {
c.Ch <- NodeEvent{NodeUpdate, n}
node := *n
c.Ch <- NodeEvent{NodeUpdate, &node}
}
3 changes: 2 additions & 1 deletion integ_test.go
Expand Up @@ -2,6 +2,7 @@ package memberlist

import (
"fmt"
"log"
"os"
"testing"
"time"
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestMemberlist_Integ(t *testing.T) {
c.GossipInterval = 20 * time.Millisecond
c.PushPullInterval = 200 * time.Millisecond
c.SecretKey = secret
c.Logger = testLoggerWithName(t, c.Name)
c.Logger = log.New(os.Stderr, c.Name, log.LstdFlags)

if i == 0 {
c.Events = &ChannelEventDelegate{eventCh}
Expand Down
20 changes: 6 additions & 14 deletions memberlist_test.go
Expand Up @@ -46,10 +46,8 @@ func testConfigNet(tb testing.TB, network byte) *Config {
config.BindAddr = getBindAddrNet(network).String()
config.Name = config.BindAddr
config.BindPort = 0 // choose free port
if tb != nil {
config.Logger = testLoggerWithName(tb, config.Name)
}
config.RequireNodeNames = true
config.Logger = log.New(os.Stderr, config.Name, log.LstdFlags)
return config
}

Expand Down Expand Up @@ -186,7 +184,6 @@ func TestCreate_protocolVersion(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
c.ProtocolVersion = tc.version
c.Logger = testLogger(t)

m, err := Create(c)
if err == nil {
Expand Down Expand Up @@ -219,7 +216,6 @@ func TestCreate_secretKey(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
c.SecretKey = tc.key
c.Logger = testLogger(t)

m, err := Create(c)
if err == nil {
Expand All @@ -239,7 +235,6 @@ func TestCreate_secretKeyEmpty(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
c.SecretKey = make([]byte, 0)
c.Logger = testLogger(t)

m, err := Create(c)
require.NoError(t, err)
Expand All @@ -253,7 +248,6 @@ func TestCreate_secretKeyEmpty(t *testing.T) {
func TestCreate_keyringOnly(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
c.Logger = testLogger(t)

keyring, err := NewKeyring(nil, make([]byte, 16))
require.NoError(t, err)
Expand All @@ -271,7 +265,6 @@ func TestCreate_keyringOnly(t *testing.T) {
func TestCreate_keyringAndSecretKey(t *testing.T) {
c := DefaultLANConfig()
c.BindAddr = getBindAddr().String()
c.Logger = testLogger(t)

keyring, err := NewKeyring(nil, make([]byte, 16))
require.NoError(t, err)
Expand Down Expand Up @@ -1322,14 +1315,14 @@ func TestMemberlist_SendTo(t *testing.T) {

// Ensure we got the messages
if len(msgs1) != 1 {
t.Fatalf("should have 1 messages!")
t.Fatalf("expected 1 message, got %d", len(msgs1))
}
if !reflect.DeepEqual(msgs1[0], []byte("pong")) {
t.Fatalf("bad msg %v", msgs1[0])
}

if len(msgs2) != 1 {
t.Fatalf("should have 1 messages!")
t.Fatalf("should have 1 message, got %d", len(msgs2))
}
if !reflect.DeepEqual(msgs2[0], []byte("ping")) {
t.Fatalf("bad msg %v", msgs2[0])
Expand Down Expand Up @@ -1483,7 +1476,7 @@ func TestMemberlist_Join_IPv6(t *testing.T) {
c1.Name = "A"
c1.BindAddr = "[::1]"
c1.BindPort = 0 // choose free
c1.Logger = testLoggerWithName(t, c1.Name)
c1.Logger = log.New(os.Stderr, c1.Name, log.LstdFlags)

m1, err := Create(c1)
require.NoError(t, err)
Expand All @@ -1494,7 +1487,7 @@ func TestMemberlist_Join_IPv6(t *testing.T) {
c2.Name = "B"
c2.BindAddr = "[::1]"
c2.BindPort = 0 // choose free
c2.Logger = testLoggerWithName(t, c2.Name)
c2.Logger = log.New(os.Stderr, c2.Name, log.LstdFlags)

m2, err := Create(c2)
require.NoError(t, err)
Expand Down Expand Up @@ -1558,7 +1551,6 @@ func TestAdvertiseAddr(t *testing.T) {
c.BindAddr = bindAddr.String()
c.BindPort = bindPort
c.Name = c.BindAddr
c.Logger = testLoggerWithName(t, c.Name)

c.AdvertiseAddr = advertiseAddr.String()
c.AdvertisePort = advertisePort
Expand Down Expand Up @@ -1783,7 +1775,7 @@ func TestMemberlist_EncryptedGossipTransition(t *testing.T) {
// Set the gossip interval fast enough to get a reasonable test,
// but slow enough to avoid "sendto: operation not permitted"
conf.GossipInterval = 100 * time.Millisecond
conf.Logger = testLoggerWithName(t, shortName)
conf.Logger = log.New(os.Stderr, shortName, log.LstdFlags)

pretty[conf.Name] = shortName
return conf
Expand Down
1 change: 0 additions & 1 deletion net_test.go
Expand Up @@ -660,7 +660,6 @@ func TestEncryptDecryptState(t *testing.T) {
SecretKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
ProtocolVersion: ProtocolVersionMax,
}
config.Logger = testLogger(t)

m, err := Create(config)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions state.go
Expand Up @@ -602,13 +602,13 @@ func (m *Memberlist) gossip() {
addr := node.Address()
if len(msgs) == 1 {
// Send single message as is
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil {
if err := m.rawSendMsgPacket(node.FullAddress(), &node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
compound := makeCompoundMessage(msgs)
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
if err := m.rawSendMsgPacket(node.FullAddress(), &node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
Expand Down Expand Up @@ -1198,9 +1198,14 @@ func (m *Memberlist) suspectNode(s *suspect) {
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
fn := func(numConfirmations int) {
var d *dead

m.nodeLock.Lock()
state, ok := m.nodeMap[s.Node]
timeout := ok && state.State == StateSuspect && state.StateChange == changeTime
if timeout {
d = &dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
}
m.nodeLock.Unlock()

if timeout {
Expand All @@ -1210,8 +1215,8 @@ func (m *Memberlist) suspectNode(s *suspect) {

m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
state.Name, numConfirmations)
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
m.deadNode(&d)

m.deadNode(d)
}
}
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
Expand Down
17 changes: 12 additions & 5 deletions state_test.go
Expand Up @@ -3,9 +3,13 @@ package memberlist
import (
"bytes"
"fmt"
"log"
"net"
"os"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -20,7 +24,7 @@ func HostMemberlist(host string, t *testing.T, f func(*Config)) *Memberlist {
c.Name = host
c.BindAddr = host
c.BindPort = 0 // choose a free port
c.Logger = testLoggerWithName(t, host)
c.Logger = log.New(os.Stderr, host, log.LstdFlags)
if f != nil {
f(c)
}
Expand Down Expand Up @@ -126,8 +130,8 @@ func TestMemberList_ProbeNode_Suspect(t *testing.T) {
time.Sleep(10 * time.Millisecond)

// One of the peers should have attempted an indirect probe.
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
if s2, s3 := atomic.LoadUint32(&m2.sequenceNum), atomic.LoadUint32(&m3.sequenceNum); s2 != 1 && s3 != 1 {
t.Fatalf("bad seqnos, expected both to be 1: %v, %v", s2, s3)
}
}

Expand Down Expand Up @@ -784,9 +788,12 @@ func TestMemberList_ProbeNode_Awareness_MissedNack(t *testing.T) {
probeTime := time.Now().Sub(startProbe)

// Node should be reported suspect.

m1.nodeLock.Lock()
if n.State != StateSuspect {
t.Fatalf("expect node to be suspect")
}
m1.nodeLock.Unlock()

// Make sure we timed out approximately on time.
if probeTime > probeTimeMax {
Expand Down Expand Up @@ -1420,8 +1427,8 @@ func TestMemberList_AliveNode_ChangeMeta(t *testing.T) {
if e.Event != NodeUpdate {
t.Fatalf("bad event: %v", e)
}
if e.Node != &state.Node {
t.Fatalf("bad event: %v", e)
if !reflect.DeepEqual(*e.Node, state.Node) {
t.Fatalf("expected %v, got %v", *e.Node, state.Node)
}
if bytes.Compare(e.Node.Meta, a.Meta) != 0 {
t.Fatalf("meta did not update")
Expand Down
4 changes: 0 additions & 4 deletions transport_test.go
Expand Up @@ -19,7 +19,6 @@ func TestTransport_Join(t *testing.T) {
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = t1
c1.Logger = testLogger(t)
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -31,7 +30,6 @@ func TestTransport_Join(t *testing.T) {
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
c2.Logger = testLogger(t)
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -67,7 +65,6 @@ func TestTransport_Send(t *testing.T) {
c1.Name = "node1"
c1.Transport = t1
c1.Delegate = d1
c1.Logger = testLogger(t)
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
Expand All @@ -79,7 +76,6 @@ func TestTransport_Send(t *testing.T) {
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
c2.Logger = testLogger(t)
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down
18 changes: 9 additions & 9 deletions util.go
Expand Up @@ -119,35 +119,35 @@ func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int {
return n - numDead
}

// kRandomNodes is used to select up to k random nodes, excluding any nodes where
// the filter function returns true. It is possible that less than k nodes are
// kRandomNodes is used to select up to k random Nodes, excluding any nodes where
// the exclude function returns true. It is possible that less than k nodes are
// returned.
func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
func kRandomNodes(k int, nodes []*nodeState, exclude func(*nodeState) bool) []Node {
n := len(nodes)
kNodes := make([]*nodeState, 0, k)
kNodes := make([]Node, 0, k)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(kNodes) < k; i++ {
// Get random node
// Get random nodeState
idx := randomOffset(n)
node := nodes[idx]
state := nodes[idx]

// Give the filter a shot at it.
if filterFn != nil && filterFn(node) {
if exclude != nil && exclude(state) {
continue OUTER
}

// Check if we have this node already
for j := 0; j < len(kNodes); j++ {
if node == kNodes[j] {
if state.Node.Name == kNodes[j].Name {
continue OUTER
}
}

// Append the node
kNodes = append(kNodes, node)
kNodes = append(kNodes, state.Node)
}
return kNodes
}
Expand Down
2 changes: 1 addition & 1 deletion util_test.go
Expand Up @@ -257,7 +257,7 @@ func TestKRandomNodes(t *testing.T) {
t.Fatalf("unexpected equal")
}

for _, s := range [][]*nodeState{s1, s2, s3} {
for _, s := range [][]Node{s1, s2, s3} {
if len(s) != 3 {
t.Fatalf("bad len")
}
Expand Down

0 comments on commit a43ca4a

Please sign in to comment.