Skip to content

Commit

Permalink
Merge pull request #1705 from celo-org/piersy/add-start-stop-e2e-test…
Browse files Browse the repository at this point in the history
…-rebased

Add start stop e2e test
  • Loading branch information
mcortesi committed Dec 16, 2021
2 parents b5cc595 + 1ab51ee commit 75f0db3
Show file tree
Hide file tree
Showing 21 changed files with 515 additions and 202 deletions.
3 changes: 1 addition & 2 deletions consensus/istanbul/backend/backend.go
Expand Up @@ -550,8 +550,7 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan
return err
}
}
go sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State)

sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State)
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions consensus/istanbul/backend/handler.go
Expand Up @@ -346,6 +346,11 @@ func (sb *Backend) newChainHead(newBlock *types.Block) {

sb.logger.Info("Validator Election Results", "address", sb.ValidatorAddress(), "elected", valSetIndex >= 0, "number", newBlock.Number().Uint64())

// We lock here to protect access to announceRunning because
// announceRunning is also accessed in StartAnnouncing and
// StopAnnouncing.
sb.announceMu.Lock()
defer sb.announceMu.Unlock()
if sb.announceRunning {
sb.logger.Trace("At end of epoch and going to refresh validator peers", "new_block_number", newBlock.Number().Uint64())
if err := sb.RefreshValPeers(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/peer_handler.go
Expand Up @@ -51,6 +51,7 @@ func (vph *validatorPeerHandler) startThread() error {
}

vph.threadRunning = true
vph.threadWg.Add(1)
go vph.thread()

return nil
Expand All @@ -72,7 +73,6 @@ func (vph *validatorPeerHandler) stopThread() error {
}

func (vph *validatorPeerHandler) thread() {
vph.threadWg.Add(1)
defer vph.threadWg.Done()

refreshValidatorPeersTicker := time.NewTicker(1 * time.Minute)
Expand Down
20 changes: 18 additions & 2 deletions consensus/istanbul/core/core.go
Expand Up @@ -115,8 +115,9 @@ type core struct {
finalCommittedSub *event.TypeMuxSubscription
timeoutSub *event.TypeMuxSubscription

futurePreprepareTimer *time.Timer
resendRoundChangeMessageTimer *time.Timer
futurePreprepareTimer *time.Timer
resendRoundChangeMessageTimer *time.Timer
resendRoundChangeMessageTimerMu sync.Mutex

roundChangeTimer *time.Timer
roundChangeTimerMu sync.RWMutex
Expand All @@ -127,6 +128,7 @@ type core struct {

rsdb RoundStateDB
current RoundState
currentMu sync.RWMutex
handlerWg *sync.WaitGroup

roundChangeSet *roundChangeSet
Expand Down Expand Up @@ -191,6 +193,11 @@ func (c *core) SetAddress(address common.Address) {
}

func (c *core) CurrentView() *istanbul.View {
// CurrentView is called by Prepare which is called by miner.worker the
// main loop, we need to synchronise this access with the write which occurs
// in Stop, which is called from the miner's update loop.
c.currentMu.RLock()
defer c.currentMu.RUnlock()
if c.current == nil {
return nil
}
Expand All @@ -200,6 +207,11 @@ func (c *core) CurrentView() *istanbul.View {
func (c *core) CurrentRoundState() RoundState { return c.current }

func (c *core) ParentCommits() MessageSet {
// ParentCommits is called by Prepare which is called by miner.worker the
// main loop, we need to synchronise this access with the write which
// occurs in Stop, which is called from the miner's update loop.
c.currentMu.RLock()
defer c.currentMu.RUnlock()
if c.current == nil {
return nil
}
Expand Down Expand Up @@ -682,6 +694,8 @@ func (c *core) stopRoundChangeTimer() {
}

func (c *core) stopResendRoundChangeTimer() {
c.resendRoundChangeMessageTimerMu.Lock()
defer c.resendRoundChangeMessageTimerMu.Unlock()
if c.resendRoundChangeMessageTimer != nil {
c.resendRoundChangeMessageTimer.Stop()
c.resendRoundChangeMessageTimer = nil
Expand Down Expand Up @@ -771,6 +785,8 @@ func (c *core) resetResendRoundChangeTimer() {
resendTimeout = maxResendTimeout
}
view := &istanbul.View{Sequence: c.current.Sequence(), Round: c.current.DesiredRound()}
c.resendRoundChangeMessageTimerMu.Lock()
defer c.resendRoundChangeMessageTimerMu.Unlock()
c.resendRoundChangeMessageTimer = time.AfterFunc(resendTimeout, func() {
c.sendEvent(resendRoundChangeEvent{view})
})
Expand Down
6 changes: 4 additions & 2 deletions consensus/istanbul/core/handler.go
Expand Up @@ -47,6 +47,8 @@ func (c *core) Start() error {
// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
c.subscribeEvents()

c.handlerWg.Add(1)
go c.handleEvents()

return nil
Expand All @@ -60,6 +62,8 @@ func (c *core) Stop() error {
// Make sure the handler goroutine exits
c.handlerWg.Wait()

c.currentMu.Lock()
defer c.currentMu.Unlock()
c.current = nil
return nil
}
Expand Down Expand Up @@ -95,8 +99,6 @@ func (c *core) handleEvents() {
// Clear state
defer c.handlerWg.Done()

c.handlerWg.Add(1)

for {
logger := c.newLogger("func", "handleEvents")
select {
Expand Down

0 comments on commit 75f0db3

Please sign in to comment.