Skip to content

Commit

Permalink
core: make reorg use less memory
Browse files Browse the repository at this point in the history
  • Loading branch information
MariusVanDerWijden committed May 23, 2022
1 parent 2b0d0ce commit 8a048dc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 18 deletions.
53 changes: 35 additions & 18 deletions core/blockchain.go
Expand Up @@ -1960,13 +1960,18 @@ func mergeLogs(logs [][]*types.Log, reverse bool) []*types.Log {
// Note the new head block won't be processed here, callers need to handle it
// externally.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
type block struct {
hash common.Hash
number uint64
}

var (
newChain types.Blocks
newChain []block
oldChain types.Blocks
commonBlock *types.Block

deletedTxs types.Transactions
addedTxs types.Transactions
deletedTxs []common.Hash
addedTxs []common.Hash

deletedLogs [][]*types.Log
rebirthLogs [][]*types.Log
Expand All @@ -1976,7 +1981,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
Expand All @@ -1987,7 +1994,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else {
// New chain is longer, stash all blocks away for subsequent insertion
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()})
}
}
if oldBlock == nil {
Expand All @@ -2006,14 +2013,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
if len(logs) > 0 {
deletedLogs = append(deletedLogs, logs)
}
newChain = append(newChain, newBlock)
newChain = append(newChain, block{hash: newBlock.Hash(), number: newBlock.NumberU64()})

// Step back with both chains
oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1)
Expand All @@ -2034,14 +2043,14 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
logFn = log.Warn
}
logFn(msg, "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].hash)
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
blockReorgMeter.Mark(1)
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].NumberU64(), "hash", newChain[0].Hash())
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].number, "hash", newChain[0].hash)
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
// len(newChain) == 0 && len(oldChain) > 0
Expand All @@ -2052,21 +2061,20 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 1; i-- {
// Insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i])
block := bc.GetBlock(newChain[i].hash, newChain[i].number)

// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
bc.writeHeadBlock(block)
// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
for _, tx := range block.Transactions() {
addedTxs = append(addedTxs, tx.Hash())
}
}

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
for _, tx := range types.TxDifferenceHash(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
Expand All @@ -2080,6 +2088,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}

// Collect the logs
for i := len(newChain) - 1; i >= 1; i-- {
// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].hash, false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
Expand Down
18 changes: 18 additions & 0 deletions core/types/transaction.go
Expand Up @@ -432,6 +432,24 @@ func TxDifference(a, b Transactions) Transactions {
return keep
}

// TxDifferenceHash returns a new set which is the difference between a and b.
func TxDifferenceHash(a, b []common.Hash) []common.Hash {
keep := make([]common.Hash, 0, len(a))

remove := make(map[common.Hash]struct{})
for _, tx := range b {
remove[tx] = struct{}{}
}

for _, tx := range a {
if _, ok := remove[tx]; !ok {
keep = append(keep, tx)
}
}

return keep
}

// TxByNonce implements the sort interface to allow sorting a list of transactions
// by their nonces. This is usually only useful for sorting transactions from a
// single account, otherwise a nonce comparison doesn't make much sense.
Expand Down

0 comments on commit 8a048dc

Please sign in to comment.