Skip to content

Commit

Permalink
Merge pull request #238 from phatngluu/parallel_commit
Browse files Browse the repository at this point in the history
Parallel hashing
  • Loading branch information
rameight committed Dec 29, 2022
2 parents ea36ade + a0bac1f commit 3f80a2a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 16 deletions.
37 changes: 28 additions & 9 deletions trie/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (b *sliceBuffer) Reset() {
// hasher is a type used for the trie Hash operation. A hasher has some
// internal preallocated temp space
type hasher struct {
sha keccakState
tmp sliceBuffer
sha keccakState
tmp sliceBuffer
parallel bool // Whether to use paralallel threads when hashing
}

// hasherPool holds pureHashers
Expand All @@ -60,8 +61,9 @@ var hasherPool = sync.Pool{
},
}

func newHasher() *hasher {
func newHasher(parallel bool) *hasher {
h := hasherPool.Get().(*hasher)
h.parallel = parallel
return h
}

Expand Down Expand Up @@ -126,14 +128,31 @@ func (h *hasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached
// Hash the full node's children, caching the newly hashed subtrees
cached = n.copy()
collapsed = n.copy()
for i := 0; i < 16; i++ {
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = h.hash(child, false)
} else {
collapsed.Children[i] = nilValueNode
if h.parallel {
var wg sync.WaitGroup
wg.Add(16)
for i := 0; i < 16; i++ {
go func(i int) {
hasher := newHasher(false)
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = hasher.hash(child, false)
} else {
collapsed.Children[i] = nilValueNode
}
returnHasherToPool(hasher)
wg.Done()
}(i)
}
wg.Wait()
} else {
for i := 0; i < 16; i++ {
if child := n.Children[i]; child != nil {
collapsed.Children[i], cached.Children[i] = h.hash(child, false)
} else {
collapsed.Children[i] = nilValueNode
}
}
}
cached.Children[16] = n.Children[16]
return collapsed, cached
}

Expand Down
4 changes: 2 additions & 2 deletions trie/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ func (it *nodeIterator) LeafBlob() []byte {
func (it *nodeIterator) LeafProof() [][]byte {
if len(it.stack) > 0 {
if _, ok := it.stack[len(it.stack)-1].node.(valueNode); ok {
hasher := newHasher()
hasher := newHasher(false)
defer returnHasherToPool(hasher)
proofs := make([][]byte, 0, len(it.stack))

for i, item := range it.stack[:len(it.stack)-1] {
// Gather nodes that end up as hash nodes (or the root)
// Gather nodes that end up as hash nodes (or the root)
node, hashed := hasher.proofHash(item.node)
if _, ok := hashed.(hashNode); ok || i == 0 {
enc, _ := rlp.EncodeToBytes(node)
Expand Down
2 changes: 1 addition & 1 deletion trie/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (t *Trie) Prove(key []byte, fromLevel uint, proofDb kaidb.KeyValueWriter) e
panic(fmt.Sprintf("%T: invalid node: %v", tn, tn))
}
}
hasher := newHasher()
hasher := newHasher(false)
defer returnHasherToPool(hasher)

for i, n := range nodes {
Expand Down
2 changes: 1 addition & 1 deletion trie/secure_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (t *SecureTrie) NodeIterator(start []byte) NodeIterator {
// The caller must not hold onto the return value because it will become
// invalid on the next call to hashKey or secKey.
func (t *SecureTrie) hashKey(key []byte) []byte {
h := newHasher()
h := newHasher(false)
h.sha.Reset()
h.sha.Write(key)
buf := h.sha.Sum(t.hashKeyBuf[:0])
Expand Down
14 changes: 11 additions & 3 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type LeafCallback func(leaf []byte, parent common.Hash) error
type Trie struct {
db *TrieDatabase
root node
// Keep track of the number leafs which have been inserted since the last
// hashing operation. This number will not directly map to the number of
// actually unhashed nodes
unhashed int
}

// newFlag returns the cache flag value for a newly created node.
Expand Down Expand Up @@ -163,6 +167,7 @@ func (t *Trie) Update(key, value []byte) {
//
// If a node was not found in the database, a MissingNodeError is returned.
func (t *Trie) TryUpdate(key, value []byte) error {
t.unhashed++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
Expand Down Expand Up @@ -259,6 +264,7 @@ func (t *Trie) Delete(key []byte) {
// TryDelete removes any existing value for key from the trie.
// If a node was not found in the database, a MissingNodeError is returned.
func (t *Trie) TryDelete(key []byte) error {
t.unhashed++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
if err != nil {
Expand Down Expand Up @@ -405,7 +411,7 @@ func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) {
// Hash returns the root hash of the trie. It does not write to the
// database and can be used even if the trie doesn't have one.
func (t *Trie) Hash() common.Hash {
hash, cached, _ := t.hashRoot(nil, nil)
hash, cached, _ := t.hashRoot(nil)
t.root = cached
return common.BytesToHash(hash.(hashNode))
}
Expand Down Expand Up @@ -456,13 +462,15 @@ func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
}

// hashRoot calculates the root hash of the given trie
func (t *Trie) hashRoot(db *TrieDatabase, onleaf LeafCallback) (node, node, error) {
func (t *Trie) hashRoot(db *TrieDatabase) (node, node, error) {
if t.root == nil {
return hashNode(emptyRoot.Bytes()), nil, nil
}
h := newHasher()
// If the number of changes is below 100, we let one thread handle it
h := newHasher(t.unhashed >= 100)
defer returnHasherToPool(h)
hashed, cached := h.hash(t.root, true)
t.unhashed = 0
return hashed, cached, nil
}

Expand Down

0 comments on commit 3f80a2a

Please sign in to comment.