Skip to content

Commit

Permalink
Fix peer access race with raft heartbeat. (#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonRichardson authored and mkeeler committed Nov 3, 2021
1 parent 7e73975 commit 6678e92
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
15 changes: 12 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,18 @@ func (r *Raft) startStopReplication() {
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
r.observe(PeerObservation{Peer: server, Removed: false})
} else if ok && s.peer.Address != server.Address {
r.logger.Info("updating peer", "peer", server.ID)
s.peer = server
} else if ok {

s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()

if peer.Address != server.Address {
r.logger.Info("updating peer", "peer", server.ID)
s.peerLock.Lock()
s.peer = server
s.peerLock.Unlock()
}
}
}

Expand Down
69 changes: 48 additions & 21 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type followerReplication struct {

// peer contains the network address and ID of the remote follower.
peer Server
// peerLock protects 'peer'
peerLock sync.RWMutex

// commitment tracks the entries acknowledged by followers so that the
// leader's commit index can advance. It is updated on successful
Expand Down Expand Up @@ -182,7 +184,10 @@ PIPELINE:
// to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported {
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()
r.logger.Error("failed to start pipeline replication to", "peer", peer, "error", err)
}
}
goto RPC
Expand All @@ -196,6 +201,8 @@ func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop
var req AppendEntriesRequest
var resp AppendEntriesResponse
var start time.Time
var peer Server

START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
Expand All @@ -205,6 +212,10 @@ START:
}
}

s.peerLock.RLock()
peer = s.peer
s.peerLock.RUnlock()

// Setup the request
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
goto SEND_SNAP
Expand All @@ -214,12 +225,12 @@ START:

// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
appendStats(string(peer.ID), start, float32(len(req.Entries)))

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand All @@ -245,7 +256,7 @@ START:
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
r.logger.Warn("appendEntries rejected, sending older logs", "peer", peer, "next", atomic.LoadUint64(&s.nextIndex))
}

CHECK_MORE:
Expand All @@ -272,7 +283,7 @@ SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
r.logger.Error("failed to send snapshot to", "peer", peer, "error", err)
return
}

Expand Down Expand Up @@ -318,18 +329,22 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
ConfigurationIndex: meta.ConfigurationIndex,
}

s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()

// Make the call
start := time.Now()
var resp InstallSnapshotResponse
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
if err := r.trans.InstallSnapshot(peer.ID, peer.Address, &req, &resp, snapshot); err != nil {
r.logger.Error("failed to install snapshot", "id", snapID, "error", err)
s.failures++
return false, err
}
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "installSnapshot"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(peer.ID)}, start)

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand All @@ -344,7 +359,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
if resp.Success {
// Update the indexes
atomic.StoreUint64(&s.nextIndex, meta.Index+1)
s.commitment.match(s.peer.ID, meta.Index)
s.commitment.match(peer.ID, meta.Index)

// Clear any failures
s.failures = 0
Expand All @@ -353,7 +368,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
s.notifyAll(true)
} else {
s.failures++
r.logger.Warn("installSnapshot rejected to", "peer", s.peer)
r.logger.Warn("installSnapshot rejected to", "peer", peer)
}
return false, nil
}
Expand All @@ -378,25 +393,29 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
return
}

s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()

start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: s.peer.ID, LastContact: s.LastContact()})
if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to heartbeat to", "peer", peer.Address, "error", err)
r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()})
failures++
select {
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
case <-stopCh:
}
} else {
if failures > 0 {
r.observe(ResumedHeartbeatObservation{PeerID: s.peer.ID})
r.observe(ResumedHeartbeatObservation{PeerID: peer.ID})
}
s.setLastContact()
failures = 0
labels := []metrics.Label{{Name: "peer_id", Value: string(s.peer.ID)}}
labels := []metrics.Label{{Name: "peer_id", Value: string(peer.ID)}}
metrics.MeasureSinceWithLabels([]string{"raft", "replication", "heartbeat"}, start, labels)
// Duplicated information. Kept for backward compatibility.
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
Expand All @@ -407,16 +426,20 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
// We only pipeline AppendEntries commands, and if we ever hit an error, we fall
// back to the standard replication which can handle more complex situations.
func (r *Raft) pipelineReplicate(s *followerReplication) error {
s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()

// Create a new pipeline
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address)
pipeline, err := r.trans.AppendEntriesPipeline(peer.ID, peer.Address)
if err != nil {
return err
}
defer pipeline.Close()

// Log start and stop of pipeline
r.logger.Info("pipelining replication", "peer", s.peer)
defer r.logger.Info("aborting pipeline replication", "peer", s.peer)
r.logger.Info("pipelining replication", "peer", peer)
defer r.logger.Info("aborting pipeline replication", "peer", peer)

// Create a shutdown and finish channel
stopCh := make(chan struct{})
Expand Down Expand Up @@ -496,8 +519,12 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,
for {
select {
case ready := <-respCh:
s.peerLock.RLock()
peer := s.peer
s.peerLock.RUnlock()

req, resp := ready.Request(), ready.Response()
appendStats(string(s.peer.ID), ready.Start(), float32(len(req.Entries)))
appendStats(string(peer.ID), ready.Start(), float32(len(req.Entries)))

// Check for a newer term, stop running
if resp.Term > req.Term {
Expand Down

0 comments on commit 6678e92

Please sign in to comment.