Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mempool,rpc: add removetx rpc method #7047

Merged
merged 19 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -22,6 +22,8 @@ Special thanks to external contributors on this release:

### FEATURES

- [mempool, rpc] \#7041 Add removeTx operation to the RPC layer. (@tychoish)

### IMPROVEMENTS

### BUG FIXES
1 change: 1 addition & 0 deletions internal/consensus/replay_stubs.go
Expand Up @@ -24,6 +24,7 @@ func (emptyMempool) Size() int { return 0 }
func (emptyMempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
return nil
}
func (emptyMempool) RemoveTxByKey(txKey types.TxKey) error { return nil }
func (emptyMempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (emptyMempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (emptyMempool) Update(
Expand Down
12 changes: 6 additions & 6 deletions internal/mempool/cache.go
Expand Up @@ -31,14 +31,14 @@ var _ TxCache = (*LRUTxCache)(nil)
type LRUTxCache struct {
mtx tmsync.Mutex
size int
cacheMap map[[TxKeySize]byte]*list.Element
cacheMap map[types.TxKey]*list.Element
list *list.List
}

func NewLRUTxCache(cacheSize int) *LRUTxCache {
return &LRUTxCache{
size: cacheSize,
cacheMap: make(map[[TxKeySize]byte]*list.Element, cacheSize),
cacheMap: make(map[types.TxKey]*list.Element, cacheSize),
list: list.New(),
}
}
Expand All @@ -53,15 +53,15 @@ func (c *LRUTxCache) Reset() {
c.mtx.Lock()
defer c.mtx.Unlock()

c.cacheMap = make(map[[TxKeySize]byte]*list.Element, c.size)
c.cacheMap = make(map[types.TxKey]*list.Element, c.size)
c.list.Init()
}

func (c *LRUTxCache) Push(tx types.Tx) bool {
c.mtx.Lock()
defer c.mtx.Unlock()

key := TxKey(tx)
key := tx.Key()

moved, ok := c.cacheMap[key]
if ok {
Expand All @@ -72,7 +72,7 @@ func (c *LRUTxCache) Push(tx types.Tx) bool {
if c.list.Len() >= c.size {
front := c.list.Front()
if front != nil {
frontKey := front.Value.([TxKeySize]byte)
frontKey := front.Value.(types.TxKey)
delete(c.cacheMap, frontKey)
c.list.Remove(front)
}
Expand All @@ -88,7 +88,7 @@ func (c *LRUTxCache) Remove(tx types.Tx) {
c.mtx.Lock()
defer c.mtx.Unlock()

key := TxKey(tx)
key := tx.Key()
e := c.cacheMap[key]
delete(c.cacheMap, key)

Expand Down
4 changes: 4 additions & 0 deletions internal/mempool/mempool.go
Expand Up @@ -32,6 +32,10 @@ type Mempool interface {
// its validity and whether it should be added to the mempool.
CheckTx(ctx context.Context, tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error

// RemoveTxByKey removes a transaction, identified by its key,
// from the mempool.
RemoveTxByKey(txKey types.TxKey) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a TxKey exactly, the same as hash? Ideally if we could remove the tx by hash that could be simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it's the sha256 hash of the transaction


// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
// maxGas.
Expand Down
1 change: 1 addition & 0 deletions internal/mempool/mock/mempool.go
Expand Up @@ -20,6 +20,7 @@ func (Mempool) Size() int { return 0 }
func (Mempool) CheckTx(_ context.Context, _ types.Tx, _ func(*abci.Response), _ mempool.TxInfo) error {
return nil
}
func (Mempool) RemoveTxByKey(txKey types.TxKey) error { return nil }
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (Mempool) Update(
Expand Down
15 changes: 0 additions & 15 deletions internal/mempool/tx.go
@@ -1,24 +1,9 @@
package mempool

import (
"crypto/sha256"

"github.com/tendermint/tendermint/types"
)

// TxKeySize defines the size of the transaction's key used for indexing.
const TxKeySize = sha256.Size

// TxKey is the fixed length array key used as an index.
func TxKey(tx types.Tx) [TxKeySize]byte {
return sha256.Sum256(tx)
}

// TxHashFromBytes returns the hash of a transaction from raw bytes.
func TxHashFromBytes(tx []byte) []byte {
return types.Tx(tx).Hash()
}

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/mempool/v0/cache_test.go
Expand Up @@ -61,7 +61,7 @@ func TestCacheAfterUpdate(t *testing.T) {
require.NotEqual(t, len(tc.txsInCache), counter,
"cache larger than expected on testcase %d", tcIndex)

nodeVal := node.Value.([sha256.Size]byte)
nodeVal := node.Value.(types.TxKey)
expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])})
// Reference for reading the errors:
// >>> sha256('\x00').hexdigest()
Expand All @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) {
// >>> sha256('\x02').hexdigest()
// 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986'

require.Equal(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex)
require.EqualValues(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex)
counter++
node = node.Next()
}
Expand Down
22 changes: 13 additions & 9 deletions internal/mempool/v0/clist_mempool.go
Expand Up @@ -3,6 +3,7 @@ package v0
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -240,7 +241,7 @@ func (mem *CListMempool) CheckTx(
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok {
if e, ok := mem.txsMap.Load(tx.Key()); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
_, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true)
// TODO: consider punishing peer for dups,
Expand Down Expand Up @@ -327,7 +328,7 @@ func (mem *CListMempool) reqResCb(
// - resCbFirstTime (lock not held) if tx is valid
func (mem *CListMempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap.Store(mempool.TxKey(memTx.tx), e)
mem.txsMap.Store(memTx.tx.Key(), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}
Expand All @@ -338,7 +339,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) {
func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()
mem.txsMap.Delete(mempool.TxKey(tx))
mem.txsMap.Delete(tx.Key())
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))

if removeFromCache {
Expand All @@ -347,13 +348,16 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
}

// RemoveTxByKey removes a transaction from the mempool by its TxKey index.
func (mem *CListMempool) RemoveTxByKey(txKey [mempool.TxKeySize]byte, removeFromCache bool) {
func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error {
if e, ok := mem.txsMap.Load(txKey); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
if memTx != nil {
mem.removeTx(memTx.tx, e.(*clist.CElement), removeFromCache)
mem.removeTx(memTx.tx, e.(*clist.CElement), false)
return nil
}
return errors.New("transaction not found")
}
return errors.New("invalid transaction found")
}

func (mem *CListMempool) isFull(txSize int) error {
Expand Down Expand Up @@ -409,7 +413,7 @@ func (mem *CListMempool) resCbFirstTime(
mem.addTx(memTx)
mem.logger.Debug(
"added good transaction",
"tx", mempool.TxHashFromBytes(tx),
"tx", types.Tx(tx).Hash(),
"res", r,
"height", memTx.height,
"total", mem.Size(),
Expand All @@ -419,7 +423,7 @@ func (mem *CListMempool) resCbFirstTime(
// ignore bad transaction
mem.logger.Debug(
"rejected bad transaction",
"tx", mempool.TxHashFromBytes(tx),
"tx", types.Tx(tx).Hash(),
"peerID", peerP2PID,
"res", r,
"err", postCheckErr,
Expand Down Expand Up @@ -460,7 +464,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Debug("tx is no longer valid", "tx", mempool.TxHashFromBytes(tx), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache)
}
Expand Down Expand Up @@ -598,7 +602,7 @@ func (mem *CListMempool) Update(
// Mempool after:
// 100
// https://github.com/tendermint/tendermint/issues/3322.
if e, ok := mem.txsMap.Load(mempool.TxKey(tx)); ok {
if e, ok := mem.txsMap.Load(tx.Key()); ok {
mem.removeTx(tx, e.(*clist.CElement), false)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/mempool/v0/clist_mempool_test.go
Expand Up @@ -544,9 +544,9 @@ func TestMempoolTxsBytes(t *testing.T) {
err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{})
require.NoError(t, err)
assert.EqualValues(t, 9, mp.SizeBytes())
mp.RemoveTxByKey(mempool.TxKey([]byte{0x07}), true)
assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key()))
assert.EqualValues(t, 9, mp.SizeBytes())
mp.RemoveTxByKey(mempool.TxKey([]byte{0x06}), true)
assert.NoError(t, mp.RemoveTxByKey(types.Tx([]byte{0x06}).Key()))
assert.EqualValues(t, 8, mp.SizeBytes())

}
Expand Down
4 changes: 2 additions & 2 deletions internal/mempool/v0/reactor.go
Expand Up @@ -171,7 +171,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error {

for _, tx := range protoTxs {
if err := r.mempool.CheckTx(context.Background(), types.Tx(tx), nil, txInfo); err != nil {
logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(tx)), "err", err)
logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", types.Tx(tx).Hash()), "err", err)
}
}

Expand Down Expand Up @@ -378,7 +378,7 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
}
r.Logger.Debug(
"gossiped tx to peer",
"tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(memTx.tx)),
"tx", fmt.Sprintf("%X", memTx.tx.Hash()),
"peer", peerID,
)
}
Expand Down
28 changes: 21 additions & 7 deletions internal/mempool/v1/mempool.go
Expand Up @@ -3,6 +3,7 @@ package v1
import (
"bytes"
"context"
"errors"
"fmt"
"sync/atomic"
"time"
Expand Down Expand Up @@ -256,7 +257,7 @@ func (txmp *TxMempool) CheckTx(
return err
}

txHash := mempool.TxKey(tx)
txHash := tx.Key()

// We add the transaction to the mempool's cache and if the transaction already
// exists, i.e. false is returned, then we check if we've seen this transaction
Expand Down Expand Up @@ -304,6 +305,19 @@ func (txmp *TxMempool) CheckTx(
return nil
}

func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
txmp.Lock()
defer txmp.Unlock()

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil {
txmp.removeTx(wtx, false)
return nil
}

return errors.New("transaction not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this get a variable like the already-in-cache case? It seem like the kind of thing the caller might care to distinguish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't much see the point in distinguishing the error: there's no circumstance where removing a transaction would be retriable (perhaps eventually via the RPC, you might get a networking error that would make it worth retrying, but you'd want to be able to identify that that error is retriable? which seems/feels like an RPC feature.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, but is the answer different for the "already in cache" case?

(Separately, and not related to this change, it seems like a mistake that we don't use the error code of the JSON-RPC response to communicate errors responsibly to the client)

}

// Flush flushes out the mempool. It acquires a read-lock, fetches all the
// transactions currently in the transaction store and removes each transaction
// from the store and all indexes and finally resets the cache.
Expand Down Expand Up @@ -451,7 +465,7 @@ func (txmp *TxMempool) Update(
}

// remove the committed transaction from the transaction store and indexes
if wtx := txmp.txStore.GetTxByHash(mempool.TxKey(tx)); wtx != nil {
if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil {
txmp.removeTx(wtx, false)
}
}
Expand Down Expand Up @@ -629,7 +643,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
tx := req.GetCheckTx().Tx
wtx := txmp.recheckCursor.Value.(*WrappedTx)
if !bytes.Equal(tx, wtx.tx) {
panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), mempool.TxKey(tx)))
panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key()))
}

// Only evaluate transactions that have not been removed. This can happen
Expand All @@ -647,7 +661,7 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
txmp.logger.Debug(
"existing transaction no longer valid; failed re-CheckTx callback",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", mempool.TxHashFromBytes(wtx.tx)),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
)
Expand Down Expand Up @@ -784,13 +798,13 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {
// the height and time based indexes.
func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
now := time.Now()
expiredTxs := make(map[[mempool.TxKeySize]byte]*WrappedTx)
expiredTxs := make(map[types.TxKey]*WrappedTx)

if txmp.config.TTLNumBlocks > 0 {
purgeIdx := -1
for i, wtx := range txmp.heightIndex.txs {
if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks {
expiredTxs[mempool.TxKey(wtx.tx)] = wtx
expiredTxs[wtx.tx.Key()] = wtx
purgeIdx = i
} else {
// since the index is sorted, we know no other txs can be be purged
Expand All @@ -807,7 +821,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
purgeIdx := -1
for i, wtx := range txmp.timestampIndex.txs {
if now.Sub(wtx.timestamp) > txmp.config.TTLDuration {
expiredTxs[mempool.TxKey(wtx.tx)] = wtx
expiredTxs[wtx.tx.Key()] = wtx
purgeIdx = i
} else {
// since the index is sorted, we know no other txs can be be purged
Expand Down
12 changes: 6 additions & 6 deletions internal/mempool/v1/mempool_test.go
Expand Up @@ -226,10 +226,10 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())

txMap := make(map[[mempool.TxKeySize]byte]testTx)
txMap := make(map[types.TxKey]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[mempool.TxKey(tTx.tx)] = tTx
txMap[tTx.tx.Key()] = tTx
priorities[i] = tTx.priority
}

Expand All @@ -241,7 +241,7 @@ func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
reapedPriorities[i] = txMap[rTx.Key()].priority
}

require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
Expand Down Expand Up @@ -276,10 +276,10 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, int64(5690), txmp.SizeBytes())

txMap := make(map[[mempool.TxKeySize]byte]testTx)
txMap := make(map[types.TxKey]testTx)
priorities := make([]int64, len(tTxs))
for i, tTx := range tTxs {
txMap[mempool.TxKey(tTx.tx)] = tTx
txMap[tTx.tx.Key()] = tTx
priorities[i] = tTx.priority
}

Expand All @@ -291,7 +291,7 @@ func TestTxMempool_ReapMaxTxs(t *testing.T) {
ensurePrioritized := func(reapedTxs types.Txs) {
reapedPriorities := make([]int64, len(reapedTxs))
for i, rTx := range reapedTxs {
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
reapedPriorities[i] = txMap[rTx.Key()].priority
}

require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
Expand Down