Skip to content

Commit

Permalink
gossip/testEnv.BlockTxs() to put txs in the same block
Browse files Browse the repository at this point in the history
  • Loading branch information
rus-alex committed Nov 30, 2023
1 parent 63be751 commit ff09e6e
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 69 deletions.
116 changes: 74 additions & 42 deletions gossip/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/Fantom-foundation/lachesis-base/abft"
Expand Down Expand Up @@ -111,7 +110,10 @@ func (em testEmitterWorldExternal) Build(e *inter.MutableEventPayload, onIndexed
return err
}

func (em testEmitterWorldExternal) Broadcast(*inter.EventPayload) {}
func (em testEmitterWorldExternal) Broadcast(emitted *inter.EventPayload) {
// PM listens and will broadcast it
em.env.feed.newEmittedEvent.Send(emitted)
}

type testConfirmedEventsProcessor struct {
blockproc.ConfirmedEventsProcessor
Expand Down Expand Up @@ -199,6 +201,7 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator) *testEnv {
env.RegisterEmitter(em)
env.pubkeys = append(env.pubkeys, pubkey)
em.Start()
em.Stop() // to control emitting manually
}

_ = env.store.GenerateSnapshotAt(common.Hash(store.GetBlockState().FinalizedStateRoot), false)
Expand All @@ -221,35 +224,48 @@ func (env *testEnv) GetEvmStateReader() *EvmStateReader {
}

func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) {
env.t = env.t.Add(spent)

env.callback.onEventConfirmed = func(e inter.EventI) {
for _, em := range env.emitters {
em.OnEventConfirmed(e)
}
}
return env.applyTxs(spent, false, txs...)
}

rest := int32(len(txs))
waitFor := make(map[common.Hash]struct{}, len(txs))
for _, tx := range txs {
func (env *testEnv) BlockTxs(spent time.Duration, txs ...*types.Transaction) (types.Receipts, error) {
return env.applyTxs(spent, true, txs...)
}

g, err := tx.EffectiveGasTip(env.store.GetRules().Economy.MinGasPrice)
func (env *testEnv) applyTxs(spent time.Duration, singleBlock bool, txs ...*types.Transaction) (types.Receipts, error) {
env.t = env.t.Add(spent)

fmt.Printf("->> TX: %s, nonce: %d, g: %d, err: %s\n", tx.Hash().String(), tx.Nonce(), g, err)
waitFor[tx.Hash()] = struct{}{}
waitForInEvents := make(map[common.Hash]struct{}, len(txs))
waitForInBlocks := make(map[common.Hash]struct{}, len(txs))
for _, tx := range txs {
waitForInEvents[tx.Hash()] = struct{}{}
waitForInBlocks[tx.Hash()] = struct{}{}
}

receipts := make(types.Receipts, 0, len(txs))

wg := new(sync.WaitGroup)
wg.Add(1)
wg.Add(2)
defer wg.Wait()

newEvents := make(chan *inter.EventPayload)
defer close(newEvents)
eventsSub := env.feed.SubscribeNewEmitted(newEvents)
defer eventsSub.Unsubscribe()
go func() {
defer wg.Done()
for e := range newEvents {
for _, tx := range e.Txs() {
h := tx.Hash()
delete(waitForInEvents, h)
// env.txpool.(*dummyTxPool).Delete(tx.Hash())
}
}
}()

receipts := make(types.Receipts, 0, len(txs))

newBlocks := make(chan evmcore.ChainHeadNotify)
defer close(newBlocks)
chainHeadSub := env.feed.SubscribeNewBlock(newBlocks)
defer chainHeadSub.Unsubscribe()

blocksSub := env.feed.SubscribeNewBlock(newBlocks)
defer blocksSub.Unsubscribe()
go func() {
defer wg.Done()
for b := range newBlocks {
Expand All @@ -260,11 +276,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty
rr := env.store.evm.GetReceipts(n, env.EthAPI.signer, b.Block.Hash, b.Block.Transactions)
for _, r := range rr {
env.txpool.(*dummyTxPool).Delete(r.TxHash)
if _, ok := waitFor[r.TxHash]; ok {
fmt.Printf("<<- TX: %s, g: %d\n", r.TxHash.String(), r.GasUsed)
if _, ok := waitForInBlocks[r.TxHash]; ok {
receipts = append(receipts, r)
delete(waitFor, r.TxHash)
atomic.AddInt32(&rest, -1)
delete(waitForInBlocks, r.TxHash)
}
}
}
Expand Down Expand Up @@ -298,22 +312,32 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty
}
}
env.txpool.(*dummyTxPool).Delete(tx)
if _, ok := waitFor[tx]; ok {
fmt.Printf("<<- TX: %s, err: skipped\n", tx.String())
delete(waitFor, tx)
atomic.AddInt32(&rest, -1)
if _, ok := waitForInBlocks[tx]; ok {
delete(waitForInBlocks, tx)
}
}
}
}
}()

byEmitters := func() []*emitter.Emitter {
// datarace does not matter
if singleBlock && len(waitForInEvents) > 0 {
count := len(env.emitters) * 70 / 100
// prevent block creation
return env.emitters[:count]
}
if len(waitForInBlocks) > 0 {
// allow block creation
return env.emitters
}
// ready to stop
return nil
}

env.txpool.AddRemotes(txs)
defer env.txpool.(*dummyTxPool).Clear()

err := env.EmitUntil(func() bool {
return atomic.LoadInt32(&rest) == 0
})
err := env.EmitUntil(byEmitters)

return receipts, err
}
Expand Down Expand Up @@ -346,24 +370,32 @@ func (env *testEnv) ApplyMPs(spent time.Duration, mps ...inter.MisbehaviourProof
env.callback.buildEvent = nil
}()

return env.EmitUntil(func() bool {
return confirmed
})
}
byEmitters := func() []*emitter.Emitter {
if !confirmed {
return env.emitters
}
return nil
}

func (env *testEnv) EmitUntil(stop func() bool) error {
t := time.Now()
return env.EmitUntil(byEmitters)
}

for !stop() {
for _, em := range env.emitters {
func (env *testEnv) EmitUntil(by func() []*emitter.Emitter) error {
start := time.Now()
for {
emitters := by()
if len(emitters) < 1 {
break
}
for _, em := range emitters {
_, err := em.EmitEvent()
if err != nil {
return err
}
}
env.WaitBlockEnd()
env.t = env.t.Add(time.Second)
if time.Since(t) > 30*time.Second {
if time.Since(start) > 30*time.Second {
panic("block doesn't get processed")
}
}
Expand Down
60 changes: 33 additions & 27 deletions gossip/txposition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"

"github.com/Fantom-foundation/go-opera/gossip/contract/ballot"
Expand All @@ -25,44 +26,49 @@ func TestTxIndexing(t *testing.T) {
ballotOption("Option 3"),
}

// valid tx
_, tx1ok, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals)
// preparing
_, tx1pre, cBallot, err := ballot.DeployBallot(env.Pay(1), env, proposals)
require.NoError(err)
require.NotNil(cBallot)
require.NotNil(tx1ok)

// invalid tx
tx2reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0))
require.NotNil(tx1pre)
tx2pre, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3))
require.NoError(err)
require.NotNil(tx2reverted)

// valid tx
tx3ok, err := cBallot.GiveRightToVote(env.Pay(1), env.Address(3))
require.NotNil(tx2pre)
receipts, err := env.BlockTxs(nextEpoch,
tx1pre,
tx2pre,
)
require.NoError(err)
require.NotNil(tx3ok)

require.Len(receipts, 2)
for i, r := range receipts {
require.Equal(types.ReceiptStatusSuccessful, r.Status, i)
}
// invalid tx
tx4skipped, err := cBallot.Vote(withLowGas(env.Pay(2)), big.NewInt(0))
tx1reverted, err := cBallot.Vote(env.Pay(2), big.NewInt(0))
require.NoError(err)
require.NotNil(tx4skipped)

require.NotNil(tx1reverted)
// valid tx
tx5ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0))
tx2ok, err := cBallot.Vote(env.Pay(3), big.NewInt(0))
require.NoError(err)
require.NotNil(tx5ok)
require.NotNil(tx2ok)
// skipped tx
_, tx3skipped, _, err := ballot.DeployBallot(withLowGas(env.Pay(1)), env, proposals)
require.NoError(err)
require.NotNil(tx3skipped)

receipts, err := env.ApplyTxs(nextEpoch,
tx1ok,
tx2reverted,
tx3ok,
tx4skipped,
tx5ok,
receipts, err = env.BlockTxs(nextEpoch,
tx1reverted,
tx2ok,
tx3skipped,
)
require.NoError(err)

for _, r := range receipts {
fmt.Printf(">>>>>>>>> tx[%s] status %d\n", r.TxHash.String(), r.Status)
require.Len(receipts, 3)
var block *big.Int
for i, r := range receipts {
if block == nil {
block = r.BlockNumber
}
require.Equal(block.Uint64(), r.BlockNumber.Uint64(), i)
}

require.Len(receipts, 0)
}

0 comments on commit ff09e6e

Please sign in to comment.