diff --git a/internal/mempool/v0/clist_mempool.go b/internal/mempool/v0/clist_mempool.go index 7816730c1fb..0a12c700090 100644 --- a/internal/mempool/v0/clist_mempool.go +++ b/internal/mempool/v0/clist_mempool.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "errors" - "fmt" "sync" "sync/atomic" @@ -450,12 +449,35 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { case *abci.Response_CheckTx: tx := req.GetCheckTx().Tx memTx := mem.recheckCursor.Value.(*mempoolTx) - if !bytes.Equal(tx, memTx.tx) { - panic(fmt.Sprintf( - "Unexpected tx response from proxy during recheck\nExpected %X, got %X", - memTx.tx, - tx)) + + // Search through the remaining list of tx to recheck for a transaction that matches + // the one we received from the ABCI application. + for { + if bytes.Equal(tx, memTx.tx) { + // We've found a tx in the recheck list that matches the tx that we + // received from the ABCI application. + // Break, and use this transaction for further checks. + break + } + + mem.logger.Error( + "re-CheckTx transaction mismatch", + "got", types.Tx(tx), + "expected", memTx.tx, + ) + + if mem.recheckCursor == mem.recheckEnd { + // we reached the end of the recheckTx list without finding a tx + // matching the one we received from the ABCI application. + // Return without processing any tx. + mem.recheckCursor = nil + return + } + + mem.recheckCursor = mem.recheckCursor.Next() + memTx = mem.recheckCursor.Value.(*mempoolTx) } + var postCheckErr error if mem.postCheck != nil { postCheckErr = mem.postCheck(tx, r.CheckTx) diff --git a/internal/mempool/v0/clist_mempool_test.go b/internal/mempool/v0/clist_mempool_test.go index b61a8333e12..a5d6ae8da97 100644 --- a/internal/mempool/v0/clist_mempool_test.go +++ b/internal/mempool/v0/clist_mempool_test.go @@ -13,9 +13,11 @@ import ( "github.com/gogo/protobuf/proto" gogotypes "github.com/gogo/protobuf/types" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" abciclient "github.com/tendermint/tendermint/abci/client" + abciclimocks "github.com/tendermint/tendermint/abci/client/mocks" "github.com/tendermint/tendermint/abci/example/kvstore" abciserver "github.com/tendermint/tendermint/abci/server" abci "github.com/tendermint/tendermint/abci/types" @@ -214,6 +216,57 @@ func TestMempoolUpdate(t *testing.T) { } } +func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { + var callback abciclient.Callback + mockClient := new(abciclimocks.Client) + mockClient.On("Start").Return(nil) + mockClient.On("SetLogger", mock.Anything) + + mockClient.On("Error").Return(nil).Times(4) + mockClient.On("FlushAsync", mock.Anything).Return(abciclient.NewReqRes(abci.ToRequestFlush()), nil) + mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true })) + + cc := func() (abciclient.Client, error) { + return mockClient, nil + } + + mp, cleanup := newMempoolWithApp(cc) + defer cleanup() + + // Add 4 transactions to the mempool by calling the mempool's `CheckTx` on each of them. + txs := []types.Tx{[]byte{0x01}, []byte{0x02}, []byte{0x03}, []byte{0x04}} + for _, tx := range txs { + reqRes := abciclient.NewReqRes(abci.ToRequestCheckTx(abci.RequestCheckTx{Tx: tx})) + reqRes.Response = abci.ToResponseCheckTx(abci.ResponseCheckTx{Code: abci.CodeTypeOK}) + // SetDone allows the ReqRes to process its callback synchronously. + // This simulates the Response being ready for the client immediately. + reqRes.SetDone() + + mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil) + err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{}) + require.NoError(t, err) + } + + // Calling update to remove the first transaction from the mempool. + // This call also triggers the mempool to recheck its remaining transactions. + err := mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil) + require.Nil(t, err) + + // The mempool has now sent its requests off to the client to be rechecked + // and is waiting for the corresponding callbacks to be called. + // We now call the mempool-supplied callback on the first and third transaction. + // This simulates the client dropping the second request. + // Previous versions of this code panicked when the ABCI application missed + // a recheck-tx request. + resp := abci.ResponseCheckTx{Code: abci.CodeTypeOK} + req := abci.RequestCheckTx{Tx: txs[1]} + callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) + + req = abci.RequestCheckTx{Tx: txs[3]} + callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) + mockClient.AssertExpectations(t) +} + func TestMempool_KeepInvalidTxsInCache(t *testing.T) { app := kvstore.NewApplication() cc := abciclient.NewLocalCreator(app) diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index a12fbc51bae..d78d9d28778 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "reflect" "sync/atomic" "time" @@ -639,58 +640,88 @@ func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) txmp.metrics.RecheckTimes.Add(1) checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if ok { - tx := req.GetCheckTx().Tx - wtx := txmp.recheckCursor.Value.(*WrappedTx) - if !bytes.Equal(tx, wtx.tx) { - panic(fmt.Sprintf("re-CheckTx transaction mismatch; got: %X, expected: %X", wtx.tx.Hash(), types.Tx(tx).Key())) + if !ok { + txmp.logger.Error("received incorrect type in mempool callback", + "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), + "got", reflect.TypeOf(res.Value).Name(), + ) + return + } + tx := req.GetCheckTx().Tx + wtx := txmp.recheckCursor.Value.(*WrappedTx) + + // Search through the remaining list of tx to recheck for a transaction that matches + // the one we received from the ABCI application. + for { + if bytes.Equal(tx, wtx.tx) { + // We've found a tx in the recheck list that matches the tx that we + // received from the ABCI application. + // Break, and use this transaction for further checks. + break } - // Only evaluate transactions that have not been removed. This can happen - // if an existing transaction is evicted during CheckTx and while this - // callback is being executed for the same evicted transaction. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) - } - - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.priority = checkTxRes.CheckTx.Priority - } else { - txmp.logger.Debug( - "existing transaction no longer valid; failed re-CheckTx callback", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err, - "code", checkTxRes.CheckTx.Code, - ) - - if wtx.gossipEl != txmp.recheckCursor { - panic("corrupted reCheckTx cursor") - } - - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) - } - } + txmp.logger.Error( + "re-CheckTx transaction mismatch", + "got", wtx.tx.Hash(), + "expected", types.Tx(tx).Key(), + ) - // move reCheckTx cursor to next element if txmp.recheckCursor == txmp.recheckEnd { + // we reached the end of the recheckTx list without finding a tx + // matching the one we received from the ABCI application. + // Return without processing any tx. txmp.recheckCursor = nil - } else { - txmp.recheckCursor = txmp.recheckCursor.Next() + return } - if txmp.recheckCursor == nil { - txmp.logger.Debug("finished rechecking transactions") + txmp.recheckCursor = txmp.recheckCursor.Next() + wtx = txmp.recheckCursor.Value.(*WrappedTx) + } + + // Only evaluate transactions that have not been removed. This can happen + // if an existing transaction is evicted during CheckTx and while this + // callback is being executed for the same evicted transaction. + if !txmp.txStore.IsTxRemoved(wtx.hash) { + var err error + if txmp.postCheck != nil { + err = txmp.postCheck(tx, checkTxRes.CheckTx) + } - if txmp.Size() > 0 { - txmp.notifyTxsAvailable() + if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { + wtx.priority = checkTxRes.CheckTx.Priority + } else { + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "err", err, + "code", checkTxRes.CheckTx.Code, + ) + + if wtx.gossipEl != txmp.recheckCursor { + panic("corrupted reCheckTx cursor") } + + txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) } + } - txmp.metrics.Size.Set(float64(txmp.Size())) + // move reCheckTx cursor to next element + if txmp.recheckCursor == txmp.recheckEnd { + txmp.recheckCursor = nil + } else { + txmp.recheckCursor = txmp.recheckCursor.Next() } + + if txmp.recheckCursor == nil { + txmp.logger.Debug("finished rechecking transactions") + + if txmp.Size() > 0 { + txmp.notifyTxsAvailable() + } + } + + txmp.metrics.Size.Set(float64(txmp.Size())) } // updateReCheckTxs updates the recheck cursors by using the gossipIndex. For