Skip to content

Commit

Permalink
no data migration needed
Browse files Browse the repository at this point in the history
  • Loading branch information
rus-alex committed Feb 12, 2024
1 parent 2e5dac1 commit 6548d12
Showing 1 changed file with 1 addition and 126 deletions.
127 changes: 1 addition & 126 deletions gossip/store_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@ package gossip
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/Fantom-foundation/lachesis-base/kvdb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"

"github.com/Fantom-foundation/go-opera/gossip/evmstore"
"github.com/Fantom-foundation/go-opera/inter"
"github.com/Fantom-foundation/go-opera/inter/iblockproc"
"github.com/Fantom-foundation/go-opera/utils/migration"
Expand All @@ -35,9 +28,6 @@ func (s *Store) migrateData() error {
}

err := s.migrations().Exec(versions, s.flushDBs)
if err != nil {
panic(err)
}
return err
}

Expand All @@ -53,8 +43,7 @@ func (s *Store) migrations() *migration.Migration {
Next("erase gossip-async db", s.eraseGossipAsyncDB).
Next("erase SFC API table", s.eraseSfcApiTable).
Next("erase legacy genesis DB", s.eraseGenesisDB).
Next("calculate upgrade heights", s.calculateUpgradeHeights).
Next("EVM TxPosition.BlockOffset fix", s.fixTxPositionBlockOffset)
Next("calculate upgrade heights", s.calculateUpgradeHeights)
}

func unsupportedMigration() error {
Expand Down Expand Up @@ -152,117 +141,3 @@ func (s *Store) calculateUpgradeHeights() error {
}
return nil
}

func (s *Store) fixTxPositionBlockOffset() (err error) {
const (
parallels = 10
)
receiptsTable, _ := s.dbs.OpenDB("evm/r")
txPositionsTable, _ := s.dbs.OpenDB("evm/x")

// for each block's receipts
var (
wg sync.WaitGroup
items = new(uint32)
)
processBlockReceipts := func(input <-chan []*types.ReceiptForStorage) {
defer wg.Done()
pos := new(evmstore.TxPosition)
for rr := range input {
for _, r := range rr {
key := r.TxHash.Bytes()
got := s.rlp.Get(txPositionsTable, key, pos)
if got == nil {
continue
}
pos.BlockOffset = uint32(r.TransactionIndex)
s.rlp.Set(txPositionsTable, key, pos)

atomic.AddUint32(items, 1)
}
}
}

// for each block
var (
blocks = new(uint32)
)
processBlocksRange := func(from, to idx.Block) {
defer wg.Done()
wg.Add(parallels)
threads := make([]chan []*types.ReceiptForStorage, parallels)
for i := range threads {
threads[i] = make(chan []*types.ReceiptForStorage, 10)
go processBlockReceipts(threads[i])
}

it := receiptsTable.NewIterator(nil, from.Bytes())
defer it.Release()
for n := 0; it.Next(); n++ {
if idx.BytesToBlock(it.Key()) > to {
break
}
atomic.AddUint32(blocks, 1)

var receiptsStorage []*types.ReceiptForStorage
err := rlp.DecodeBytes(it.Value(), &receiptsStorage)
if err != nil {
s.Log.Crit("Failed to decode rlp", "err", err, "size", len(it.Value()))
}
threads[n%parallels] <- receiptsStorage
}
for i := range threads {
close(threads[i])
}
}

// status log
var (
done = make(chan struct{})
)
defer close(done)
go func() {
var (
start = time.Now()
prevFlushTime = time.Now()
)
for again := true; again; {
select {
case <-time.After(s.cfg.MaxNonFlushedPeriod / 5):
again = true
case <-done:
again = false
}
s.Log.Info("Txs positions processing", "elapsed", common.PrettyDuration(time.Since(start)), "blocks", atomic.LoadUint32(blocks), "items", atomic.LoadUint32(items))
if s.dbs.NotFlushedSizeEst() > s.cfg.MaxNonFlushedSize/2 || time.Since(prevFlushTime) > s.cfg.MaxNonFlushedPeriod {
prevFlushTime = time.Now()
err = s.flushDBs()
if err != nil {
break
}
}
}
}()

// params
firstBlock := func() idx.Block {
it := receiptsTable.NewIterator(nil, nil)
defer it.Release()
if it.Next() {
return idx.BytesToBlock(it.Key())
}
return 0
}()
lastBlock := s.GetBlockState().LastBlock.Idx

// main start
s.Log.Debug("processBlocksRange", "from", firstBlock, "to", lastBlock)
step := (lastBlock - firstBlock) / parallels
wg.Add(parallels + 1)
for i := idx.Block(0); i <= parallels; i++ {
go processBlocksRange(firstBlock+i*step, firstBlock+(i+1)*step-1)
}
wg.Wait()

return
}

0 comments on commit 6548d12

Please sign in to comment.