Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a data race, and add support for fixing another one in Serf #238

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.go
Expand Up @@ -16,6 +16,10 @@ type Config struct {
// The name of this node. This must be unique in the cluster.
Name string

// Meta data associated with the local node. The slice must have a length
// less than MetaMaxSize.
Meta []byte

// Transport is a hook for providing custom code to communicate with
// other nodes. If this is left nil, then memberlist will by default
// make a NetTransport using BindAddr and BindPort from this structure.
Expand Down
3 changes: 3 additions & 0 deletions delegate.go
Expand Up @@ -7,6 +7,9 @@ type Delegate interface {
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
//
// Deprecated: set Config.Meta for initial node meta, and pass any updates to
// Memberlist.UpdateNodeMeta.
NodeMeta(limit int) []byte

// NotifyMsg is called when a user-data message is received.
Expand Down
37 changes: 22 additions & 15 deletions memberlist.go
Expand Up @@ -187,6 +187,13 @@ func newMemberlist(conf *Config) (*Memberlist, error) {
nodeAwareTransport = &shimNodeAwareTransport{transport}
}

if len(conf.Meta) == 0 && conf.Delegate != nil {
conf.Meta = conf.Delegate.NodeMeta(MetaMaxSize)
}
if len(conf.Meta) > MetaMaxSize {
return nil, fmt.Errorf("meta data (%d) is longer than the limit (%d)", len(conf.Meta), MetaMaxSize)
}

m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -430,21 +437,12 @@ func (m *Memberlist) setAlive() error {
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
}

// Set any metadata from the delegate.
var meta []byte
if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
if len(meta) > MetaMaxSize {
panic("Node meta data provided is longer than the limit")
}
}

a := alive{
Incarnation: m.nextIncarnation(),
Node: m.config.Name,
Addr: addr,
Port: uint16(port),
Meta: meta,
Meta: m.config.Meta,
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&a, nil, true)
Expand Down Expand Up @@ -488,20 +486,28 @@ func (m *Memberlist) LocalNode() *Node {
// meta data. This will block until the update message is successfully
// broadcasted to a member of the cluster, if any exist or until a specified
// timeout is reached.
// Deprecated: use UpdateNodeMeta
func (m *Memberlist) UpdateNode(timeout time.Duration) error {
// Get the node meta data
var meta []byte
if m.config.Delegate != nil {
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
if len(meta) > MetaMaxSize {
panic("Node meta data provided is longer than the limit")
}
}
return m.UpdateNodeMeta(timeout, meta)
}

// UpdateNodeMeta is used to trigger re-advertising the local node. This is
// primarily used with a Delegate to support dynamic updates to the local
// meta data. This will block until the update message is successfully
// broadcasted to a member of the cluster, if any exist or until a specified
// timeout is reached.
func (m *Memberlist) UpdateNodeMeta(timeout time.Duration, meta []byte) error {
if len(meta) > MetaMaxSize {
panic("Node meta data provided is longer than the limit")
}

// Get the existing node
m.nodeLock.RLock()
state := m.nodeMap[m.config.Name]
m.nodeLock.RUnlock()

// Format a new alive message
a := alive{
Expand All @@ -512,6 +518,7 @@ func (m *Memberlist) UpdateNode(timeout time.Duration) error {
Meta: meta,
Vsn: m.config.BuildVsnArray(),
}
m.nodeLock.RUnlock()
notifyCh := make(chan struct{})
m.aliveNode(&a, notifyCh, true)

Expand Down
7 changes: 1 addition & 6 deletions memberlist_test.go
Expand Up @@ -1114,8 +1114,6 @@ func TestMemberlist_delegateMeta_Update(t *testing.T) {

c2 := testConfig(t)
c2.BindPort = bindPort
mock2 := &MockDelegate{meta: []byte("lb")}
c2.Delegate = mock2

m2, err := Create(c2)
require.NoError(t, err)
Expand All @@ -1126,13 +1124,10 @@ func TestMemberlist_delegateMeta_Update(t *testing.T) {

yield()

// Update the meta data roles
mock1.setMeta([]byte("api"))
mock2.setMeta([]byte("db"))

err = m1.UpdateNode(0)
require.NoError(t, err)
err = m2.UpdateNode(0)
err = m2.UpdateNodeMeta(0, []byte("db"))
require.NoError(t, err)

yield()
Expand Down