From 682ee820fa116b8a081d269c9d155ec19411959b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 2 May 2024 11:18:27 +0300 Subject: [PATCH] core/state: parallelise parts of state commit (#29681) * core/state, internal/workerpool: parallelize parts of state commit * core, internal: move workerpool into syncx * core/state: use errgroups, commit accounts concurrently * core: resurrect detailed commit timers to almost-accuracy --- core/blockchain.go | 2 +- core/state/state_object.go | 3 + core/state/statedb.go | 126 +++++++++++++++++++++++++------------ 3 files changed, 89 insertions(+), 42 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9de4baccca75a..654b4fbdcac20 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1963,7 +1963,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them - blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) + blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits) blockInsertTimer.UpdateSince(start) return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil diff --git a/core/state/state_object.go b/core/state/state_object.go index 1454f7a459a59..d75ba01376bde 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() { // commit obtains a set of dirty storage trie nodes and updates the account data. // The returned set can be nil if nothing to commit. This function assumes all // storage mutations have already been flushed into trie by updateRoot. +// +// Note, commit may run concurrently across all the state objects. Do not assume +// thread-safe access to the statedb. func (s *stateObject) commit() (*trienode.NodeSet, error) { // Short circuit if trie is not even loaded, don't bother with committing anything if s.trie == nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 6d9cc907e03f4..66cfc8f05a320 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -23,6 +23,7 @@ import ( "math/big" "slices" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -37,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" "github.com/holiman/uint256" + "golang.org/x/sync/errgroup" ) type revision struct { @@ -1146,66 +1148,108 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er storageTrieNodesUpdated int storageTrieNodesDeleted int nodes = trienode.NewMergedNodeSet() - codeWriter = s.db.DiskDB().NewBatch() ) // Handle all state deletions first if err := s.handleDestruction(nodes); err != nil { return common.Hash{}, err } - // Handle all state updates afterwards + // Handle all state updates afterwards, concurrently to one another to shave + // off some milliseconds from the commit operation. Also accumulate the code + // writes to run in parallel with the computations. start := time.Now() + var ( + code = s.db.DiskDB().NewBatch() + lock sync.Mutex + root common.Hash + workers errgroup.Group + ) + // Schedule the account trie first since that will be the biggest, so give + // it the most time to crunch. + // + // TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain + // heads, which seems excessive given that it doesn't do hashing, it just + // shuffles some data. For comparison, the *hashing* at chain head is 2-3ms. + // We need to investigate what's happening as it seems something's wonky. + // Obviously it's not an end of the world issue, just something the original + // code didn't anticipate for. + workers.Go(func() error { + // Write the account trie changes, measuring the amount of wasted time + newroot, set, err := s.trie.Commit(true) + if err != nil { + return err + } + root = newroot + + // Merge the dirty nodes of account trie into global set + lock.Lock() + defer lock.Unlock() + + if set != nil { + if err = nodes.Merge(set); err != nil { + return err + } + accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() + } + s.AccountCommits = time.Since(start) + return nil + }) + // Schedule each of the storage tries that need to be updated, so they can + // run concurrently to one another. + // + // TODO(karalabe): Experimentally, the account commit takes approximately the + // same time as all the storage commits combined, so we could maybe only have + // 2 threads in total. But that kind of depends on the account commit being + // more expensive than it should be, so let's fix that and revisit this todo. for addr, op := range s.mutations { if op.isDelete() { continue } - obj := s.stateObjects[addr] - // Write any contract code associated with the state object + obj := s.stateObjects[addr] if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false } - // Write any storage changes in the state object to its storage trie - set, err := obj.commit() - if err != nil { - return common.Hash{}, err - } - // Merge the dirty nodes of storage trie into global set. It is possible - // that the account was destructed and then resurrected in the same block. - // In this case, the node set is shared by both accounts. - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err + // Run the storage updates concurrently to one another + workers.Go(func() error { + // Write any storage changes in the state object to its storage trie + set, err := obj.commit() + if err != nil { + return err } - updates, deleted := set.Size() - storageTrieNodesUpdated += updates - storageTrieNodesDeleted += deleted - } + // Merge the dirty nodes of storage trie into global set. It is possible + // that the account was destructed and then resurrected in the same block. + // In this case, the node set is shared by both accounts. + lock.Lock() + defer lock.Unlock() + + if set != nil { + if err = nodes.Merge(set); err != nil { + return err + } + updates, deleted := set.Size() + storageTrieNodesUpdated += updates + storageTrieNodesDeleted += deleted + } + s.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime + return nil + }) } - s.StorageCommits += time.Since(start) - - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) + // Schedule the code commits to run concurrently too. This shouldn't really + // take much since we don't often commit code, but since it's disk access, + // it's always yolo. + workers.Go(func() error { + if code.ValueSize() > 0 { + if err := code.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } } - } - // Write the account trie changes, measuring the amount of wasted time - start = time.Now() - - root, set, err := s.trie.Commit(true) - if err != nil { + return nil + }) + // Wait for everything to finish and update the metrics + if err := workers.Wait(); err != nil { return common.Hash{}, err } - // Merge the dirty nodes of account trie into global set - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err - } - accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() - } - // Report the commit metrics - s.AccountCommits += time.Since(start) - accountUpdatedMeter.Mark(int64(s.AccountUpdated)) storageUpdatedMeter.Mark(int64(s.StorageUpdated)) accountDeletedMeter.Mark(int64(s.AccountDeleted))