Skip to content

Commit

Permalink
parallelized EVM TxPosition.BlockOffset migration
Browse files Browse the repository at this point in the history
Txs positions processing	blocks=37776408 items=262,972,464
  no thread:  elapsed=50m0.817s
  10 threads: elapsed=42m43.202s
 100 threads: elapsed=42m37.924s
  • Loading branch information
rus-alex committed Oct 29, 2023
1 parent 2ddbac4 commit 19c852c
Showing 1 changed file with 54 additions and 25 deletions.
79 changes: 54 additions & 25 deletions gossip/store_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package gossip
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -150,48 +153,74 @@ func (s *Store) calculateUpgradeHeights() error {
return nil
}

func (s *Store) fixTxPositionBlockOffset() error {
func (s *Store) fixTxPositionBlockOffset() (err error) {
const parallels = 10
var (
start = time.Now()
processed = 0
wg sync.WaitGroup
items = new(uint32)
)

receiptsTable, _ := s.dbs.OpenDB("evm/r")
txPositionsTable, _ := s.dbs.OpenDB("evm/x")

processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) {
defer wg.Done()
pos := new(evmstore.TxPosition)
for rr := range input {
for _, r := range rr {
key := r.TxHash.Bytes()
got := s.rlp.Get(txPositionsTable, key, pos)
if got == nil {
continue
}
pos.BlockOffset = uint32(r.TransactionIndex)
s.rlp.Set(txPositionsTable, key, pos)

atomic.AddUint32(items, 1)
}
}
}

wg.Add(parallels)
threads := make([]chan []*types.ReceiptForStorage, parallels)
for i := range threads {
threads[i] = make(chan []*types.ReceiptForStorage, 10)
go processBlockReceipts(threads[i])
}

var (
block idx.Block
start = time.Now()
prevFlushTime = time.Now()
)
it := receiptsTable.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
for n := 0; it.Next(); n++ {
block = idx.BytesToBlock(it.Key())

var receiptsStorage []*types.ReceiptForStorage
err := rlp.DecodeBytes(it.Value(), &receiptsStorage)
if err != nil {
s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value()))
}
threads[n%parallels] <- receiptsStorage

var pos = new(evmstore.TxPosition)

for _, r := range receiptsStorage {
processed++
key := r.TxHash.Bytes()

got := s.rlp.Get(txPositionsTable, key, pos)
if got == nil {
continue
}
pos.BlockOffset = uint32(r.TransactionIndex)
s.rlp.Set(txPositionsTable, key, pos)

if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 {
err = s.flushDBs()
if err != nil {
return err
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed)
if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod {
prevFlushTime = time.Now()
err = s.flushDBs()
if err != nil {
break
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", atomic.LoadUint32(items))
}
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "done", processed)
for i := range threads {
close(threads[i])
}
wg.Wait()
// no need to flush dbs at end as it migration engine does

return nil
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "block", block, "items", *items)

return
}

0 comments on commit 19c852c

Please sign in to comment.