Skip to content

Commit

Permalink
core/state: parallelise parts of state commit (#29681)
Browse files Browse the repository at this point in the history
* core/state, internal/workerpool: parallelize parts of state commit

* core, internal: move workerpool into syncx

* core/state: use errgroups, commit accounts concurrently

* core: resurrect detailed commit timers to almost-accuracy
  • Loading branch information
karalabe committed May 2, 2024
1 parent 9f96e07 commit 682ee82
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 42 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Expand Up @@ -1963,7 +1963,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)

return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil
Expand Down
3 changes: 3 additions & 0 deletions core/state/state_object.go
Expand Up @@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() {
// commit obtains a set of dirty storage trie nodes and updates the account data.
// The returned set can be nil if nothing to commit. This function assumes all
// storage mutations have already been flushed into trie by updateRoot.
//
// Note, commit may run concurrently across all the state objects. Do not assume
// thread-safe access to the statedb.
func (s *stateObject) commit() (*trienode.NodeSet, error) {
// Short circuit if trie is not even loaded, don't bother with committing anything
if s.trie == nil {
Expand Down
126 changes: 85 additions & 41 deletions core/state/statedb.go
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"slices"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate"
"github.com/holiman/uint256"
"golang.org/x/sync/errgroup"
)

type revision struct {
Expand Down Expand Up @@ -1146,66 +1148,108 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
storageTrieNodesUpdated int
storageTrieNodesDeleted int
nodes = trienode.NewMergedNodeSet()
codeWriter = s.db.DiskDB().NewBatch()
)
// Handle all state deletions first
if err := s.handleDestruction(nodes); err != nil {
return common.Hash{}, err
}
// Handle all state updates afterwards
// Handle all state updates afterwards, concurrently to one another to shave
// off some milliseconds from the commit operation. Also accumulate the code
// writes to run in parallel with the computations.
start := time.Now()
var (
code = s.db.DiskDB().NewBatch()
lock sync.Mutex
root common.Hash
workers errgroup.Group
)
// Schedule the account trie first since that will be the biggest, so give
// it the most time to crunch.
//
// TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain
// heads, which seems excessive given that it doesn't do hashing, it just
// shuffles some data. For comparison, the *hashing* at chain head is 2-3ms.
// We need to investigate what's happening as it seems something's wonky.
// Obviously it's not an end of the world issue, just something the original
// code didn't anticipate for.
workers.Go(func() error {
// Write the account trie changes, measuring the amount of wasted time
newroot, set, err := s.trie.Commit(true)
if err != nil {
return err
}
root = newroot

// Merge the dirty nodes of account trie into global set
lock.Lock()
defer lock.Unlock()

if set != nil {
if err = nodes.Merge(set); err != nil {
return err
}
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
s.AccountCommits = time.Since(start)
return nil
})
// Schedule each of the storage tries that need to be updated, so they can
// run concurrently to one another.
//
// TODO(karalabe): Experimentally, the account commit takes approximately the
// same time as all the storage commits combined, so we could maybe only have
// 2 threads in total. But that kind of depends on the account commit being
// more expensive than it should be, so let's fix that and revisit this todo.
for addr, op := range s.mutations {
if op.isDelete() {
continue
}
obj := s.stateObjects[addr]

// Write any contract code associated with the state object
obj := s.stateObjects[addr]
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
// Run the storage updates concurrently to one another
workers.Go(func() error {
// Write any storage changes in the state object to its storage trie
set, err := obj.commit()
if err != nil {
return err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
}
// Merge the dirty nodes of storage trie into global set. It is possible
// that the account was destructed and then resurrected in the same block.
// In this case, the node set is shared by both accounts.
lock.Lock()
defer lock.Unlock()

if set != nil {
if err = nodes.Merge(set); err != nil {
return err
}
updates, deleted := set.Size()
storageTrieNodesUpdated += updates
storageTrieNodesDeleted += deleted
}
s.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime
return nil
})
}
s.StorageCommits += time.Since(start)

if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
// Schedule the code commits to run concurrently too. This shouldn't really
// take much since we don't often commit code, but since it's disk access,
// it's always yolo.
workers.Go(func() error {
if code.ValueSize() > 0 {
if err := code.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
}
// Write the account trie changes, measuring the amount of wasted time
start = time.Now()

root, set, err := s.trie.Commit(true)
if err != nil {
return nil
})
// Wait for everything to finish and update the metrics
if err := workers.Wait(); err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of account trie into global set
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
}
accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size()
}
// Report the commit metrics
s.AccountCommits += time.Since(start)

accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted))
Expand Down

0 comments on commit 682ee82

Please sign in to comment.