Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Sep 28, 2023
2 parents 2df2a65 + 73f5bcb commit e703cdb
Show file tree
Hide file tree
Showing 21 changed files with 244 additions and 148 deletions.
17 changes: 8 additions & 9 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
if err != nil {
return nil, err
}

return stateDB.GetCode(contract), nil
}

Expand All @@ -212,7 +211,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

return stateDB.GetBalance(contract), nil
}

Expand All @@ -225,7 +223,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
if err != nil {
return 0, err
}

return stateDB.GetNonce(contract), nil
}

Expand All @@ -238,7 +235,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

val := stateDB.GetState(contract, key)
return val[:], nil
}
Expand Down Expand Up @@ -700,8 +696,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}
block.AddTxWithChain(b.blockchain, tx)
})
stateDB, _ := b.blockchain.State()

stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.pendingReceipts = receipts[0]
Expand Down Expand Up @@ -821,11 +819,12 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
block.OffsetTime(int64(adjustment.Seconds()))
})
stateDB, _ := b.blockchain.State()

stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func setupGeth(stack *node.Node) error {
if err != nil {
return err
}
backend.SetSynced()

_, err = backend.BlockChain().InsertChain(chain.blocks[1:])
return err
Expand Down
54 changes: 19 additions & 35 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,17 +338,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if err := bc.loadLastState(); err != nil {
return nil, err
}
// Make sure the state associated with the block is available
// Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync.
head := bc.CurrentBlock()
if !bc.HasState(head.Root) {
if head.Number.Uint64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it.
//
// The solution is to reset the state to the genesis state. Although it may not
// match the sync target, the state healer will later address and correct any
// inconsistencies.
bc.resetState()
// scheme. This situation occurs when the initial state sync is not finished
// yet, or the chain head is rewound below the pivot point. In both scenario,
// there is no possible recovery approach except for rerunning a snap sync.
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
Expand Down Expand Up @@ -631,28 +631,6 @@ func (bc *BlockChain) SetSafe(header *types.Header) {
}
}

// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}

// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewinding with snapshots enabled to ensure that we go back further than
Expand Down Expand Up @@ -688,7 +666,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
bc.resetState()
} else {
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
Expand Down Expand Up @@ -716,16 +693,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
}
}
if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 {
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
Expand All @@ -740,6 +715,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// to low, so it's safe to update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock.Header())
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))

// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root()) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash())
}
}
// Rewind the snap block in a simpleton way to the target head
if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() {
Expand Down Expand Up @@ -839,7 +823,7 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// Reset the trie database with the fresh snap synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(root); err != nil {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
Expand Down
22 changes: 22 additions & 0 deletions core/rawdb/accessors_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,25 @@ func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
log.Crit("Failed to delete skeleton header", "err", err)
}
}

const (
StateSyncUnknown = uint8(0) // flags the state snap sync is unknown
StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet
StateSyncFinished = uint8(2) // flags the state snap sync is completed
)

// ReadSnapSyncStatusFlag retrieves the state snap sync status flag.
func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 {
blob, err := db.Get(snapSyncStatusFlagKey)
if err != nil || len(blob) != 1 {
return StateSyncUnknown
}
return blob[0]
}

// WriteSnapSyncStatusFlag stores the state snap sync status flag into database.
func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) {
if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil {
log.Crit("Failed to store sync status flag", "err", err)
}
}
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ var (
// transitionStatusKey tracks the eth2 transition status.
transitionStatusKey = []byte("eth2-transition")

// snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus")

// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
Expand Down
6 changes: 6 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,13 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr
return err
}
}
// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
state, err := p.chain.StateAt(head.Root)
if err != nil {
state, err = p.chain.StateAt(types.EmptyRootHash)
}
if err != nil {
return err
}
Expand Down
15 changes: 14 additions & 1 deletion core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,20 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool

// Set the basic pool parameters
pool.gasTip.Store(gasTip)
pool.reset(nil, head)

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
statedb, err := pool.chain.StateAt(head.Root)
if err != nil {
statedb, err = pool.chain.StateAt(types.EmptyRootHash)
}
if err != nil {
return err
}
pool.currentHead.Store(head)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)

// Start the reorg loop early, so it can handle requests generated during
// journal loading.
Expand Down
10 changes: 8 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B
return nil, nil, errors.New("header not found")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}

func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
Expand All @@ -230,7 +233,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
return nil, nil, errors.New("hash is not currently canonical")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}
return nil, nil, errors.New("invalid arguments; neither block nor hash specified")
}
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return s.handler.acceptTxs.Load() }
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
Expand Down
4 changes: 3 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
d.blockchain.TrieDB().Reset(types.EmptyRootHash)
if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
Expand Down
50 changes: 23 additions & 27 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)

database ethdb.Database
txpool txPool
Expand Down Expand Up @@ -164,32 +164,24 @@ func newHandler(config *handlerConfig) (*handler, error) {
fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock()
if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 {
h.snapSync.Store(true)
log.Warn("Switch sync mode from full sync to snap sync")
log.Warn("Switch sync mode from full sync to snap sync", "reason", "snap sync incomplete")
} else if !h.chain.HasState(fullBlock.Root) {
h.snapSync.Store(true)
log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing")
}
} else {
if h.chain.CurrentBlock().Number.Uint64() > 0 {
head := h.chain.CurrentBlock()
if head.Number.Uint64() > 0 && h.chain.HasState(head.Root) {
// Print warning log if database is not empty to run snap sync.
log.Warn("Switch sync mode from snap sync to full sync")
log.Warn("Switch sync mode from snap sync to full sync", "reason", "snap sync complete")
} else {
// If snap sync was requested and our database is empty, grant it
h.snapSync.Store(true)
log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash())
}
}
// If sync succeeds, pass a callback to potentially disable snap sync mode
// and enable transaction propagation.
success := func() {
// If we were running snap sync and it finished, disable doing another
// round on next sync cycle
if h.snapSync.Load() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
// If we've successfully finished a sync cycle, accept transactions from
// the network
h.enableSyncedFeatures()
}
// Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, h.enableSyncedFeatures)
if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil {
if h.chain.Config().TerminalTotalDifficultyPassed {
log.Info("Chain post-merge, sync via beacon client")
Expand Down Expand Up @@ -246,8 +238,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
// accept each others' blocks until a restart. Unfortunately we haven't figured
// out a way yet where nodes can decide unilaterally whether the network is new
// or not. This should be fixed if we figure out a solution.
if h.snapSync.Load() {
log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
if !h.synced.Load() {
log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
if h.merger.TDDReached() {
Expand All @@ -273,11 +265,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return 0, nil
}
n, err := h.chain.InsertChain(blocks)
if err == nil {
h.enableSyncedFeatures() // Mark initial sync done on any fetcher import
}
return n, err
return h.chain.InsertChain(blocks)
}
// SYSCOIN
if h.chain.GetChainConfig().SyscoinBlock == nil {
Expand Down Expand Up @@ -698,7 +686,15 @@ func (h *handler) txBroadcastLoop() {
// enableSyncedFeatures enables the post-sync functionalities when the initial
// sync is finished.
func (h *handler) enableSyncedFeatures() {
h.acceptTxs.Store(true)
// Mark the local node as synced.
h.synced.Store(true)

// If we were running snap sync and it finished, disable doing another
// round on next sync cycle
if h.snapSync.Load() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
}
Expand Down

0 comments on commit e703cdb

Please sign in to comment.