Skip to content

Commit

Permalink
mempool: remove panic when recheck-tx was not sent to ABCI application (
Browse files Browse the repository at this point in the history
#7134) (#7142)

This pull request fixes a panic that exists in both mempools. The panic occurs when the ABCI client misses a response from the ABCI application. This happen when the ABCI client drops the request as a result of a full client queue. The fix here was to loop through the ordered list of recheck-tx in the callback until one matches the currently observed recheck request.

(cherry picked from commit b0130c8)

Co-authored-by: William Banfield <4561443+williambanfield@users.noreply.github.com>
  • Loading branch information
mergify[bot] and williambanfield committed Oct 19, 2021
1 parent 57e4e18 commit dbc72e0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 46 deletions.
34 changes: 28 additions & 6 deletions internal/mempool/v0/clist_mempool.go
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -450,12 +449,35 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(tx, memTx.tx) {
panic(fmt.Sprintf(
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
memTx.tx,
tx))

// Search through the remaining list of tx to recheck for a transaction that matches
// the one we received from the ABCI application.
for {
if bytes.Equal(tx, memTx.tx) {
// We've found a tx in the recheck list that matches the tx that we
// received from the ABCI application.
// Break, and use this transaction for further checks.
break
}

mem.logger.Error(
"re-CheckTx transaction mismatch",
"got", types.Tx(tx),
"expected", memTx.tx,
)

if mem.recheckCursor == mem.recheckEnd {
// we reached the end of the recheckTx list without finding a tx
// matching the one we received from the ABCI application.
// Return without processing any tx.
mem.recheckCursor = nil
return
}

mem.recheckCursor = mem.recheckCursor.Next()
memTx = mem.recheckCursor.Value.(*mempoolTx)
}

var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
Expand Down
53 changes: 53 additions & 0 deletions internal/mempool/v0/clist_mempool_test.go
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/gogo/protobuf/proto"
gogotypes "github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

abciclient "github.com/tendermint/tendermint/abci/client"
abciclimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -214,6 +216,57 @@ func TestMempoolUpdate(t *testing.T) {
}
}

func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
var callback abciclient.Callback
mockClient := new(abciclimocks.Client)
mockClient.On("Start").Return(nil)
mockClient.On("SetLogger", mock.Anything)

mockClient.On("Error").Return(nil).Times(4)
mockClient.On("FlushAsync", mock.Anything).Return(abciclient.NewReqRes(abci.ToRequestFlush()), nil)
mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true }))

cc := func() (abciclient.Client, error) {
return mockClient, nil
}

mp, cleanup := newMempoolWithApp(cc)
defer cleanup()

// Add 4 transactions to the mempool by calling the mempool's `CheckTx` on each of them.
txs := []types.Tx{[]byte{0x01}, []byte{0x02}, []byte{0x03}, []byte{0x04}}
for _, tx := range txs {
reqRes := abciclient.NewReqRes(abci.ToRequestCheckTx(abci.RequestCheckTx{Tx: tx}))
reqRes.Response = abci.ToResponseCheckTx(abci.ResponseCheckTx{Code: abci.CodeTypeOK})
// SetDone allows the ReqRes to process its callback synchronously.
// This simulates the Response being ready for the client immediately.
reqRes.SetDone()

mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil)
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
require.NoError(t, err)
}

// Calling update to remove the first transaction from the mempool.
// This call also triggers the mempool to recheck its remaining transactions.
err := mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.Nil(t, err)

// The mempool has now sent its requests off to the client to be rechecked
// and is waiting for the corresponding callbacks to be called.
// We now call the mempool-supplied callback on the first and third transaction.
// This simulates the client dropping the second request.
// Previous versions of this code panicked when the ABCI application missed
// a recheck-tx request.
resp := abci.ResponseCheckTx{Code: abci.CodeTypeOK}
req := abci.RequestCheckTx{Tx: txs[1]}
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))

req = abci.RequestCheckTx{Tx: txs[3]}
callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp))
mockClient.AssertExpectations(t)
}

func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
Expand Down
111 changes: 71 additions & 40 deletions internal/mempool/v1/mempool.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"

Expand Down Expand Up @@ -639,58 +640,88 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response)
txmp.metrics.RecheckTimes.Add(1)

checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if ok {
tx := req.GetCheckTx().Tx
wtx := txmp.recheckCursor.Value.(*WrappedTx)
if !bytes.Equal(tx, wtx.tx) {
panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key()))
if !ok {
txmp.logger.Error("received incorrect type in mempool callback",
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
"got", reflect.TypeOf(res.Value).Name(),
)
return
}
tx := req.GetCheckTx().Tx
wtx := txmp.recheckCursor.Value.(*WrappedTx)

// Search through the remaining list of tx to recheck for a transaction that matches
// the one we received from the ABCI application.
for {
if bytes.Equal(tx, wtx.tx) {
// We've found a tx in the recheck list that matches the tx that we
// received from the ABCI application.
// Break, and use this transaction for further checks.
break
}

// Only evaluate transactions that have not been removed. This can happen
// if an existing transaction is evicted during CheckTx and while this
// callback is being executed for the same evicted transaction.
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes.CheckTx)
}

if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
wtx.priority = checkTxRes.CheckTx.Priority
} else {
txmp.logger.Debug(
"existing transaction no longer valid; failed re-CheckTx callback",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
)

if wtx.gossipEl != txmp.recheckCursor {
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
}
}
txmp.logger.Error(
"re-CheckTx transaction mismatch",
"got", wtx.tx.Hash(),
"expected", types.Tx(tx).Key(),
)

// move reCheckTx cursor to next element
if txmp.recheckCursor == txmp.recheckEnd {
// we reached the end of the recheckTx list without finding a tx
// matching the one we received from the ABCI application.
// Return without processing any tx.
txmp.recheckCursor = nil
} else {
txmp.recheckCursor = txmp.recheckCursor.Next()
return
}

if txmp.recheckCursor == nil {
txmp.logger.Debug("finished rechecking transactions")
txmp.recheckCursor = txmp.recheckCursor.Next()
wtx = txmp.recheckCursor.Value.(*WrappedTx)
}

// Only evaluate transactions that have not been removed. This can happen
// if an existing transaction is evicted during CheckTx and while this
// callback is being executed for the same evicted transaction.
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes.CheckTx)
}

if txmp.Size() > 0 {
txmp.notifyTxsAvailable()
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
wtx.priority = checkTxRes.CheckTx.Priority
} else {
txmp.logger.Debug(
"existing transaction no longer valid; failed re-CheckTx callback",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
)

if wtx.gossipEl != txmp.recheckCursor {
panic("corrupted reCheckTx cursor")
}

txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache)
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
// move reCheckTx cursor to next element
if txmp.recheckCursor == txmp.recheckEnd {
txmp.recheckCursor = nil
} else {
txmp.recheckCursor = txmp.recheckCursor.Next()
}

if txmp.recheckCursor == nil {
txmp.logger.Debug("finished rechecking transactions")

if txmp.Size() > 0 {
txmp.notifyTxsAvailable()
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
}

// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For
Expand Down

0 comments on commit dbc72e0

Please sign in to comment.