Skip to content

Commit

Permalink
eth: request id dispatcher and direct req/reply APIs (#23576)
Browse files Browse the repository at this point in the history
* eth: request ID based message dispatcher

* eth: fix dispatcher cancellation, rework fetchers idleness tracker

* eth/downloader: drop peers who refuse to serve advertised chains
  • Loading branch information
karalabe committed Nov 26, 2021
1 parent 3038e48 commit c10a0a6
Show file tree
Hide file tree
Showing 52 changed files with 3,195 additions and 3,382 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Expand Up @@ -215,7 +215,7 @@ var (
defaultSyncMode = ethconfig.Defaults.SyncMode
SyncModeFlag = TextMarshalerFlag{
Name: "syncmode",
Usage: `Blockchain sync mode ("fast", "full", "snap" or "light")`,
Usage: `Blockchain sync mode ("snap", "full" or "light")`,
Value: &defaultSyncMode,
}
GCModeFlag = cli.StringFlag{
Expand Down
24 changes: 9 additions & 15 deletions core/blockchain.go
Expand Up @@ -629,9 +629,9 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
return rootNumber, bc.loadLastState()
}

// FastSyncCommitHead sets the current head block to the one defined by the hash
// SnapSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// Make sure that both the block as well at its state trie exists
block := bc.GetBlockByHash(hash)
if block == nil {
Expand Down Expand Up @@ -736,30 +736,24 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()

// Add the block to the canonical chain number scheme and mark as the head
batch := bc.db.NewBatch()
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteTxLookupEntriesByBlock(batch, block)
rawdb.WriteHeadBlockHash(batch, block.Hash())

// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
rawdb.WriteHeadHeaderHash(batch, block.Hash())
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
}
// Flush the whole batch into the disk, exit the node if failed
if err := batch.Write(); err != nil {
log.Crit("Failed to update chain indexes and markers", "err", err)
}
// Update all in-memory chain markers in the last step
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
}
bc.hc.SetCurrentHeader(block.Header())

bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))

bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
}
Expand Down
224 changes: 112 additions & 112 deletions core/blockchain_repair_test.go

Large diffs are not rendered by default.

240 changes: 120 additions & 120 deletions core/blockchain_sethead_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/blockchain_test.go
Expand Up @@ -2637,7 +2637,7 @@ func TestTransactionIndices(t *testing.T) {
}
}

func TestSkipStaleTxIndicesInFastSync(t *testing.T) {
func TestSkipStaleTxIndicesInSnapSync(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
Expand Down
22 changes: 22 additions & 0 deletions core/chain_makers.go
Expand Up @@ -155,6 +155,28 @@ func (b *BlockGen) TxNonce(addr common.Address) uint64 {

// AddUncle adds an uncle header to the generated block.
func (b *BlockGen) AddUncle(h *types.Header) {
// The uncle will have the same timestamp and auto-generated difficulty
h.Time = b.header.Time

var parent *types.Header
for i := b.i - 1; i >= 0; i-- {
if b.chain[i].Hash() == h.ParentHash {
parent = b.chain[i].Header()
break
}
}
chainreader := &fakeChainReader{config: b.config}
h.Difficulty = b.engine.CalcDifficulty(chainreader, b.header.Time, parent)

// The gas limit and price should be derived from the parent
h.GasLimit = parent.GasLimit
if b.config.IsLondon(h.Number) {
h.BaseFee = misc.CalcBaseFee(b.config, parent)
if !b.config.IsLondon(parent.Number) {
parentGasLimit := parent.GasLimit * params.ElasticityMultiplier
h.GasLimit = CalcGasLimit(parentGasLimit, parentGasLimit)
}
}
b.uncles = append(b.uncles, h)
}

Expand Down
18 changes: 0 additions & 18 deletions core/rawdb/accessors_chain.go
Expand Up @@ -242,24 +242,6 @@ func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) {
}
}

// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow
// reporting correct numbers across restarts.
func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 {
data, _ := db.Get(fastTrieProgressKey)
if len(data) == 0 {
return 0
}
return new(big.Int).SetBytes(data).Uint64()
}

// WriteFastTrieProgress stores the fast sync trie process counter to support
// retrieving it across restarts.
func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) {
if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil {
log.Crit("Failed to store fast sync trie progress", "err", err)
}
}

// ReadTxIndexTail retrieves the number of oldest indexed block
// whose transaction indices has been indexed. If the corresponding entry
// is non-existent in database it means the indexing has been finished.
Expand Down
8 changes: 0 additions & 8 deletions core/rawdb/accessors_snapshot.go
Expand Up @@ -208,11 +208,3 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) {
log.Crit("Failed to store snapshot sync status", "err", err)
}
}

// DeleteSnapshotSyncStatus deletes the serialized sync status saved at the last
// shutdown
func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotSyncStatusKey); err != nil {
log.Crit("Failed to remove snapshot sync status", "err", err)
}
}

0 comments on commit c10a0a6

Please sign in to comment.