Skip to content

Commit

Permalink
consul: Adding reconcilation to handle reaped Serf nodes. Fixes #15.
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Apr 3, 2014
1 parent 38ae471 commit ed39e90
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
56 changes: 56 additions & 0 deletions consul/leader.go
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"net"
"strconv"
"time"
)

Expand Down Expand Up @@ -102,10 +103,65 @@ WAIT:
func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now())
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members {
if err := s.reconcileMember(member); err != nil {
return err
}
knownMembers[member.Name] = struct{}{}
}

// Reconcile any members that have been reaped while we were not the leader
return s.reconcileReaped(knownMembers)
}

// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for SerfCheckID
// in a crticial state that does not correspond to a known Serf member. We generate
// a "reap" event to cause the node to be cleaned up.
func (s *Server) reconcileReaped(known map[string]struct{}) error {
state := s.fsm.State()
_, critical := state.ChecksInState(structs.HealthCritical)
for _, check := range critical {
// Ignore any non serf checks
if check.CheckID != SerfCheckID {
continue
}

// Check if this node is "known" by serf
if _, ok := known[check.Node]; ok {
continue
}

// Create a fake member
member := serf.Member{
Name: check.Node,
Tags: map[string]string{
"dc": s.config.Datacenter,
"role": "node",
},
}

// Get the node services, look for ConsulServiceID
_, services := state.NodeServices(check.Node)
serverPort := 0
for _, service := range services.Services {
if service.ID == ConsulServiceID {
serverPort = service.Port
break
}
}

// Create the appropriate tags if this was a server node
if serverPort > 0 {
member.Tags["role"] = "consul"
member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
}

// Attempt to reap this member
if err := s.handleReapMember(member); err != nil {
return err
}
}
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions consul/leader_test.go
Expand Up @@ -208,6 +208,44 @@ func TestLeader_ReapMember(t *testing.T) {
}
}

func TestLeader_Reconcile_ReapMember(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()

// Wait until we have a leader
time.Sleep(100 * time.Millisecond)

// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: SerfCheckID,
Name: SerfCheckName,
Status: structs.HealthCritical,
},
}
var out struct{}
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}

// Node should be gone
state := s1.fsm.State()
_, found, _ := state.GetNode("no-longer-around")
if found {
t.Fatalf("client registered")
}
}

func TestLeader_Reconcile(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
Expand Down

0 comments on commit ed39e90

Please sign in to comment.