Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "abci: change client to use multi-reader mutexes (#6306)" (backport #7106) #7110

Merged
merged 3 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Special thanks to external contributors on this release:
### FEATURES

- [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state in the event of an incorrect app hash. (@cmwaters)

### IMPROVEMENTS

### BUG FIXES

- [\#7106](https://github.com/tendermint/tendermint/pull/7106) Revert mutex change to ABCI Clients (@tychoish).
8 changes: 4 additions & 4 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.RWMutex
mtx tmsync.Mutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
}
Expand Down Expand Up @@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.RLock()
defer r.mtx.RUnlock()
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true
r.mtx.Unlock()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
2 changes: 1 addition & 1 deletion abci/client/creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Creator func() (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)

return func() (Client, error) {
return NewLocalClient(mtx, app), nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.RWMutex
mtx tmsync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
Expand Down Expand Up @@ -149,17 +149,17 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
26 changes: 12 additions & 14 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type localClient struct {
service.BaseService

mtx *tmsync.RWMutex
mtx *tmsync.Mutex
types.Application
Callback
}
Expand All @@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = &tmsync.RWMutex{}
mtx = new(tmsync.Mutex)
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand All @@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err
}

func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return app.callback(
Expand Down Expand Up @@ -100,8 +98,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck
}

func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return app.callback(
Expand Down Expand Up @@ -215,8 +213,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon
}

func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -249,8 +247,8 @@ func (app *localClient) QuerySync(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type socketClient struct {

reqQueue chan *reqResWithContext

mtx tmsync.RWMutex
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
Expand Down Expand Up @@ -102,8 +102,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand All @@ -113,8 +113,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func newStateWithConfigAndBlockStore(
blockStore *store.BlockStore,
) *State {
// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down