Skip to content

Commit

Permalink
feat: add tx limit to mempool (backport #14014) (#14131)
Browse files Browse the repository at this point in the history
* feat: add tx limit to mempool  (#14014)

* feat: add bounding max tx to mempool

* add bounded condition

* sligh improvement on generator

* remove unbouded option

* add test

* added mempool options mechanism

* mising test

* seting mempool

* change function name

* change function name

* failing test

* Revert "failing test"

This reverts commit d527982.

* fix import block

* changelog entries

* add ability to do unbounded mempool

* remove unesesary variable

* small comments

* change 0 to mean unbounded

* t

* small test fix

* add the ability to be bounded unbounded and disabled

* t

* set default maxtx

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* example for opts

* remove superflues logs entry

* add mempool to configurations

* fix more understandable name

* remove table in favor of bulletpoints

* sender nonce to unbounded

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update types/mempool/sender_nonce.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update types/mempool/sender_nonce.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update docs/docs/building-apps/02-app-mempool.md

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* Update server/config/config.go

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>

* t

* add comment for options

* fix inport

* fix inport

Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com>
(cherry picked from commit 754ca31)

# Conflicts:
#	server/start.go

* fix conflicts

Co-authored-by: Jeancarlo Barrios <JeancarloBarrios@users.noreply.github.com>
Co-authored-by: Julien Robert <julien@rbrt.fr>
  • Loading branch information
3 people committed Dec 2, 2022
1 parent e8cf5f6 commit 3a51a15
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 34 deletions.
10 changes: 10 additions & 0 deletions docs/docs/building-apps/02-app-mempool.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ Now that we have walked through the `PrepareProposal` & `ProcessProposal`, we ca
There are countless designs that an application developer can write for a mempool, the core team opted to provide a simple implementation of a nonce mempool. The nonce mempool is a mempool that keeps transactions from an sorted by nonce in order to avoid the issues with nonces.

It works by storing the transation in a list sorted by the transaction nonce. When the proposer asks for transactions to be included in a block it randomly selects a sender and gets the first transaction in the list. It repeats this until the mempool is empty or the block is full.

### Configurations

#### MaxTxs

Its an integer value that sets the mempool in one of three modes, bounded, unbounded, or disabled.

- **negative**: Disabled, mempool does not insert new tx and return early.
- **zero**: Unbounded mempool has no tx limit and will never fail with ErrMempoolTxMaxCapacity.
- **positive**: Bounded, it fails with ErrMempoolTxMaxCapacity when maxTx value is the same as CountTx()
13 changes: 13 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}

// MempoolConfig defines the configurations for the appside mempool
type MempoolConfig struct {
// MaxTxs defines the behavior of the mempool. A negative value indicates
// the mempool is disabled entirely, zero indicates that the mempool is
// unbounded in how many txs it may contain, and a positive value indicates
// the maximum amount of txs it may contain.
MaxTxs int
}

type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
Expand Down Expand Up @@ -234,6 +243,7 @@ type Config struct {
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
Mempool MempoolConfig `mapstructure:"mempool"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -323,6 +333,9 @@ func DefaultConfig() *Config {
Keys: []string{"*"},
},
},
Mempool: MempoolConfig{
MaxTxs: 0,
},
}
}

Expand Down
8 changes: 8 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
###############################################################################
### Mempool ###
###############################################################################
[mempool]
max-txs = "{{ .Mempool.MaxTxs }}"
`

var configTemplate *template.Template
Expand Down
10 changes: 8 additions & 2 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
"github.com/cosmos/cosmos-sdk/server/types"
pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
)

const (
Expand Down Expand Up @@ -79,6 +80,9 @@ const (
flagGRPCAddress = "grpc.address"
flagGRPCWebEnable = "grpc-web.enable"
flagGRPCWebAddress = "grpc-web.address"

// mempool flags
FlagMempoolMaxTxs = "mempool.max-txs"
)

// StartCmd runs the service passed in, either stand-alone or in-process with
Expand Down Expand Up @@ -190,6 +194,8 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.

cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")

cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app side mempool")

// add support for all Tendermint-specific command line options
tcmd.AddNodeFlags(cmd)
return cmd
Expand Down Expand Up @@ -471,7 +477,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return errors.New("'grpc' must be enable in online mode for Rosetta to work")
}

minGasPrices, err := sdktypes.ParseDecCoins(config.MinGasPrices)
minGasPrices, err := sdk.ParseDecCoins(config.MinGasPrices)
if err != nil {
ctx.Logger.Error("failed to parse minimum-gas-prices: ", err)
return err
Expand Down
2 changes: 2 additions & 0 deletions simapp/simd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cosmos/cosmos-sdk/store"
simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
authcmd "github.com/cosmos/cosmos-sdk/x/auth/client/cli"
"github.com/cosmos/cosmos-sdk/x/auth/types"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
Expand Down Expand Up @@ -306,6 +307,7 @@ func newApp(
baseapp.SetSnapshot(snapshotStore, snapshotOptions),
baseapp.SetIAVLCacheSize(cast.ToInt(appOpts.Get(server.FlagIAVLCacheSize))),
baseapp.SetIAVLDisableFastNode(cast.ToBool(appOpts.Get(server.FlagDisableIAVLFastNode))),
baseapp.SetMempool(mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(cast.ToInt(appOpts.Get(server.FlagMempoolMaxTxs))))),
)
}

Expand Down
5 changes: 4 additions & 1 deletion types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ type Iterator interface {
Tx() sdk.Tx
}

var ErrTxNotFound = errors.New("tx not found in mempool")
var (
ErrTxNotFound = errors.New("tx not found in mempool")
ErrMempoolTxMaxCapacity = errors.New("pool reached max tx capacity")
)
71 changes: 51 additions & 20 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (
_ Iterator = (*senderNonceMepoolIterator)(nil)
)

var DefaultMaxTx = 0

// senderNonceMempool is a mempool that prioritizes transactions within a sender by nonce, the lowest first,
// but selects a random sender on each iteration. The mempool is iterated by:
//
Expand All @@ -26,15 +28,27 @@ var (
//
// Note that PrepareProposal could choose to stop iteration before reaching the end if maxBytes is reached.
type senderNonceMempool struct {
senders map[string]*skiplist.SkipList
rnd *rand.Rand
senders map[string]*skiplist.SkipList
rnd *rand.Rand
maxTx int
existingTx map[txKey]bool
}

type SenderNonceOptions func(mp *senderNonceMempool)

type txKey struct {
address string
nonce uint64
}

// NewSenderNonceMempool creates a new mempool that prioritizes transactions by nonce, the lowest first.
func NewSenderNonceMempool() Mempool {
func NewSenderNonceMempool(opts ...SenderNonceOptions) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
existingTx := make(map[txKey]bool)
snp := &senderNonceMempool{
senders: senderMap,
senders: senderMap,
maxTx: DefaultMaxTx,
existingTx: existingTx,
}

var seed int64
Expand All @@ -44,17 +58,30 @@ func NewSenderNonceMempool() Mempool {
}
snp.setSeed(seed)

for _, opt := range opts {
opt(snp)
}

return snp
}

// NewSenderNonceMempoolWithSeed creates a new mempool that prioritizes transactions by nonce, the lowest first and sets the random seed.
func NewSenderNonceMempoolWithSeed(seed int64) Mempool {
senderMap := make(map[string]*skiplist.SkipList)
snp := &senderNonceMempool{
senders: senderMap,
// SenderNonceSeedOpt Option To add a Seed for random type when calling the constructor NewSenderNonceMempool
// Example:
// > random_seed := int64(1000)
// > NewSenderNonceMempool(SenderNonceSeedTxOpt(random_seed))
func SenderNonceSeedOpt(seed int64) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.setSeed(seed)
}
}

// SenderNonceMaxTxOpt Option To set limit of max tx when calling the constructor NewSenderNonceMempool
// Example:
// > NewSenderNonceMempool(SenderNonceMaxTxOpt(100))
func SenderNonceMaxTxOpt(maxTx int) SenderNonceOptions {
return func(snp *senderNonceMempool) {
snp.maxTx = maxTx
}
snp.setSeed(seed)
return snp
}

func (snm *senderNonceMempool) setSeed(seed int64) {
Expand All @@ -65,6 +92,12 @@ func (snm *senderNonceMempool) setSeed(seed int64) {
// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer.
// priority is ignored.
func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
if snm.maxTx > 0 && snm.CountTx() >= snm.maxTx {
return ErrMempoolTxMaxCapacity
}
if snm.maxTx < 0 {
return nil
}
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
Expand All @@ -82,7 +115,8 @@ func (snm *senderNonceMempool) Insert(_ sdk.Context, tx sdk.Tx) error {
snm.senders[sender] = senderTxs
}
senderTxs.Set(nonce, tx)

key := txKey{nonce: nonce, address: sender}
snm.existingTx[key] = true
return nil
}

Expand Down Expand Up @@ -117,14 +151,7 @@ func (snm *senderNonceMempool) Select(_ sdk.Context, _ [][]byte) Iterator {

// CountTx returns the total count of txs in the mempool.
func (snm *senderNonceMempool) CountTx() int {
count := 0

// Disable gosec here since we need neither strong randomness nor deterministic iteration.
// #nosec
for _, value := range snm.senders {
count += value.Len()
}
return count
return len(snm.existingTx)
}

// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx
Expand Down Expand Up @@ -154,6 +181,10 @@ func (snm *senderNonceMempool) Remove(tx sdk.Tx) error {
if senderTxs.Len() == 0 {
delete(snm.senders, sender)
}

key := txKey{nonce: nonce, address: sender}
delete(snm.existingTx, key)

return nil
}

Expand Down
21 changes: 11 additions & 10 deletions types/mempool/sender_nonce_property_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package mempool_test

import (
"math/rand"
"sort"

"pgregory.net/rapid"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdk "github.com/cosmos/cosmos-sdk/types"
mempool "github.com/cosmos/cosmos-sdk/types/mempool"
simtypes "github.com/cosmos/cosmos-sdk/types/simulation"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
Expand All @@ -27,29 +25,31 @@ var (
// same elements input on the mempool should be in the output except for sender nonce duplicates, which are overwritten by the later duplicate entries.
// for every sender transaction tx_n, tx_0.nonce < tx_1.nonce ... < tx_n.nonce

var genAddress = rapid.Custom(func(t *rapid.T) simtypes.Account {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(rapid.Int64().Draw(t, "seed for account"))), 1)
return accounts[0]
})
func AddressGenerator(t *rapid.T) *rapid.Generator[sdk.AccAddress] {
return rapid.Custom(func(t *rapid.T) sdk.AccAddress {
pkBz := rapid.SliceOfN(rapid.Byte(), 20, 20).Draw(t, "hex")
return sdk.AccAddress(pkBz)
})
}

func testMempoolProperties(t *rapid.T) {

ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
mp := mempool.NewSenderNonceMempool()

genMultipleAddress := rapid.SliceOfNDistinct(genAddress, 1, 10, func(acc simtypes.Account) string {
return acc.Address.String()
genMultipleAddress := rapid.SliceOfNDistinct(AddressGenerator(t), 1, 10, func(acc sdk.AccAddress) string {
return acc.String()
})

accounts := genMultipleAddress.Draw(t, "address")
genTx := rapid.Custom(func(t *rapid.T) testTx {
return testTx{
priority: rapid.Int64Range(0, 1000).Draw(t, "priority"),
nonce: rapid.Uint64().Draw(t, "nonce"),
address: rapid.SampledFrom(accounts).Draw(t, "acc").Address,
address: rapid.SampledFrom(accounts).Draw(t, "acc"),
}
})
genMultipleTX := rapid.SliceOfN(genTx, 1, 500)
genMultipleTX := rapid.SliceOfN(genTx, 1, 5000)

txs := genMultipleTX.Draw(t, "txs")
senderTxRaw := getSenderTxMap(txs)
Expand All @@ -61,6 +61,7 @@ func testMempoolProperties(t *rapid.T) {

iter := mp.Select(ctx, nil)
orderTx := fetchAllTxs(iter)
require.Equal(t, len(orderTx), mp.CountTx())
senderTxOrdered := getSenderTxMap(orderTx)
for key := range senderTxOrdered {
ordered, found := senderTxOrdered[key]
Expand Down
59 changes: 58 additions & 1 deletion types/mempool/sender_nonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *MempoolTestSuite) TestTxOrder() {
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
pool := mempool.NewSenderNonceMempoolWithSeed(tt.seed)
pool := mempool.NewSenderNonceMempool(mempool.SenderNonceSeedOpt(tt.seed))
// create test txs and insert into mempool
for i, ts := range tt.txs {
tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a}
Expand All @@ -137,3 +137,60 @@ func (s *MempoolTestSuite) TestTxOrder() {
})
}
}

func (s *MempoolTestSuite) TestMaxTx() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
mp := mempool.NewSenderNonceMempool(mempool.SenderNonceMaxTxOpt(1))

tx := testTx{
nonce: 0,
address: accounts[0].Address,
priority: rand.Int63(),
}
tx2 := testTx{
nonce: 1,
address: accounts[0].Address,
priority: rand.Int63(),
}

// empty mempool behavior
require.Equal(t, 0, s.mempool.CountTx())
itr := mp.Select(ctx, nil)
require.Nil(t, itr)

ctx = ctx.WithPriority(tx.priority)
err := mp.Insert(ctx, tx)
require.NoError(t, err)
ctx = ctx.WithPriority(tx.priority)
err = mp.Insert(ctx, tx2)
require.Equal(t, mempool.ErrMempoolTxMaxCapacity, err)

}

func (s *MempoolTestSuite) TestTxNotFoundOnSender() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
mp := mempool.NewSenderNonceMempool()

txSender := testTx{
nonce: 0,
address: accounts[0].Address,
priority: rand.Int63(),
}

tx := testTx{
nonce: 1,
address: accounts[0].Address,
priority: rand.Int63(),
}

ctx = ctx.WithPriority(tx.priority)
err := mp.Insert(ctx, txSender)
require.NoError(t, err)
err = mp.Remove(tx)
require.Equal(t, mempool.ErrTxNotFound, err)

}

0 comments on commit 3a51a15

Please sign in to comment.