Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: use less memory during reorgs #24616

Merged
merged 4 commits into from May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 25 additions & 13 deletions core/blockchain.go
Expand Up @@ -1965,8 +1965,8 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
oldChain types.Blocks
commonBlock *types.Block

deletedTxs types.Transactions
addedTxs types.Transactions
deletedTxs []common.Hash
addedTxs []common.Hash

deletedLogs [][]*types.Log
rebirthLogs [][]*types.Log
Expand All @@ -1976,7 +1976,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
Expand Down Expand Up @@ -2006,7 +2008,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
for _, tx := range oldBlock.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}

// Collect deleted logs for notification
logs := bc.collectLogs(oldBlock.Hash(), true)
Expand All @@ -2025,6 +2029,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return fmt.Errorf("invalid new chain")
}
}

// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Info
Expand All @@ -2041,7 +2046,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].NumberU64(), "hash", newChain[0].Hash())
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number(), "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
// len(newChain) == 0 && len(oldChain) > 0
Expand All @@ -2054,19 +2059,17 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i])

// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
for _, tx := range newChain[i].Transactions() {
addedTxs = append(addedTxs, tx.Hash())
}
}

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
for _, tx := range types.HashDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx)
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
Expand All @@ -2080,6 +2083,15 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if err := indexesBatch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}

// Collect the logs
for i := len(newChain) - 1; i >= 1; i-- {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a humongous amount of blocks, maybe we shouldn't do this. If we're reorging >10K blocks, do we really want to collect and emit 3M logs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we maybe should limit this -- cap the logs at some reasonable number, like 1K ?

// Collect reborn logs due to chain reorg
logs := bc.collectLogs(newChain[i].Hash(), false)
if len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs)
}
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
Expand Down
18 changes: 18 additions & 0 deletions core/types/transaction.go
Expand Up @@ -432,6 +432,24 @@ func TxDifference(a, b Transactions) Transactions {
return keep
}

// HashDifference returns a new set which is the difference between a and b.
func HashDifference(a, b []common.Hash) []common.Hash {
keep := make([]common.Hash, 0, len(a))

remove := make(map[common.Hash]struct{})
for _, hash := range b {
remove[hash] = struct{}{}
}

for _, hash := range a {
if _, ok := remove[hash]; !ok {
keep = append(keep, hash)
}
}

return keep
}

// TxByNonce implements the sort interface to allow sorting a list of transactions
// by their nonces. This is usually only useful for sorting transactions from a
// single account, otherwise a nonce comparison doesn't make much sense.
Expand Down