Skip to content

Commit

Permalink
Ensure RoundStateDB is closed when core is stopped (#1996)
Browse files Browse the repository at this point in the history
Prior to this change the RoundStateDB was never closed. This could
result in leaked goroutines from its internal leveldb instance.

This change caused the TestStartStopValidators e2e test to fail because
changing the rsdb open to be in the startup code, slowed down the start
method for core this caused the gossiping of the manually added peers
(done in the test after starting each previously stopped node )to happen
after the validator peer handler had removed all the peers resulting in
gossiping certs to no one. Thus leaving the network in a disconnected
state.

The fix was really hacky, but just to wait 250ms after starting a node
and hoping that the validator peer handler will have removed nodes by
that point and then add the peers and then wait a little for them to be
registered and then finally gossip to them.

Co-authored-by: Paul Lange <palango@users.noreply.github.com>
  • Loading branch information
piersy and palango committed Jan 17, 2023
1 parent f92e923 commit 6e8588c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
5 changes: 0 additions & 5 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ type core struct {

// New creates an Istanbul consensus core
func New(backend CoreBackend, config *istanbul.Config) Engine {
rsdb, err := newRoundStateDB(config.RoundStateDBPath, nil)
if err != nil {
log.Crit("Failed to open RoundStateDB", "err", err)
}

c := &core{
config: config,
Expand All @@ -167,7 +163,6 @@ func New(backend CoreBackend, config *istanbul.Config) Engine {
pendingRequests: prque.New(nil),
pendingRequestsMu: new(sync.Mutex),
consensusTimestamp: time.Time{},
rsdb: rsdb,
consensusPrepareTimeGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/consensus_prepare", nil),
consensusCommitTimeGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/consensus_commit", nil),
verifyGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/verify", nil),
Expand Down
10 changes: 8 additions & 2 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ import (
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
"github.com/celo-org/celo-blockchain/log"
)

// Start implements core.Engine.Start
func (c *core) Start() error {

rsdb, err := newRoundStateDB(c.config.RoundStateDBPath, nil)
if err != nil {
log.Crit("Failed to open RoundStateDB", "err", err)
}
c.rsdb = rsdb
roundState, err := c.createRoundState()
if err != nil {
return err
Expand Down Expand Up @@ -63,10 +68,11 @@ func (c *core) Stop() error {
// Make sure the handler goroutine exits
c.handlerWg.Wait()

err := c.rsdb.Close()
c.currentMu.Lock()
defer c.currentMu.Unlock()
c.current = nil
return nil
return err
}

// ----------------------------------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions e2e_test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ func TestStartStopValidators(t *testing.T) {
err = network[2].Start()
require.NoError(t, err)

// We need to wait here to allow the call to "Backend.RefreshValPeers" to
// complete before adding peers. This is because "Backend.RefreshValPeers"
// deletes all peers and then re-adds any peers from the cached
// connections, but in the case that peers were recently added there may
// not have been enough time to connect to them and populate the connection
// cache, and in that case "Backend.RefreshValPeers" simply removes all the
// peers.
time.Sleep(250 * time.Millisecond)
// Connect last stopped node to running nodes
network[2].AddPeers(network[:2]...)
time.Sleep(25 * time.Millisecond)
Expand All @@ -221,6 +229,14 @@ func TestStartStopValidators(t *testing.T) {
err = network[3].Start()
require.NoError(t, err)

// We need to wait here to allow the call to "Backend.RefreshValPeers" to
// complete before adding peers. This is because "Backend.RefreshValPeers"
// deletes all peers and then re-adds any peers from the cached
// connections, but in the case that peers were recently added there may
// not have been enough time to connect to them and populate the connection
// cache, and in that case "Backend.RefreshValPeers" simply removes all the
// peers.
time.Sleep(250 * time.Millisecond)
// Connect final node to rest of network
network[3].AddPeers(network[:3]...)
time.Sleep(25 * time.Millisecond)
Expand Down
5 changes: 4 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ func (w *worker) stop() {
atomic.StoreInt32(&w.running, 0)

if istanbul, ok := w.engine.(consensus.Istanbul); ok {
istanbul.StopValidating()
err := istanbul.StopValidating()
if err != nil {
log.Error("Error while calling engine.StopValidating", "err", err)
}
}
}

Expand Down

0 comments on commit 6e8588c

Please sign in to comment.