Skip to content

Commit

Permalink
Revert "abci: change client to use multi-reader mutexes (tendermint#6306
Browse files Browse the repository at this point in the history
)"

This reverts commit 1c4dbe3.
  • Loading branch information
tychoish committed Oct 12, 2021
1 parent 4829595 commit 2684c61
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 30 deletions.
8 changes: 4 additions & 4 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.RWMutex
mtx tmsync.Mutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
}
Expand Down Expand Up @@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.RLock()
defer r.mtx.RUnlock()
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true
r.mtx.Unlock()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
2 changes: 1 addition & 1 deletion abci/client/creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Creator func() (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)

return func() (Client, error) {
return NewLocalClient(mtx, app), nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.RWMutex
mtx tmsync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
Expand Down Expand Up @@ -149,17 +149,17 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
26 changes: 12 additions & 14 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type localClient struct {
service.BaseService

mtx *tmsync.RWMutex
mtx *tmsync.Mutex
types.Application
Callback
}
Expand All @@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = &tmsync.RWMutex{}
mtx = new(tmsync.Mutex)
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand All @@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err
}

func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return app.callback(
Expand Down Expand Up @@ -100,8 +98,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck
}

func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return app.callback(
Expand Down Expand Up @@ -215,8 +213,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon
}

func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -249,8 +247,8 @@ func (app *localClient) QuerySync(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type socketClient struct {

reqQueue chan *reqResWithContext

mtx tmsync.RWMutex
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
Expand Down Expand Up @@ -102,8 +102,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

Expand All @@ -113,8 +113,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func newStateWithConfigAndBlockStore(
blockStore *store.BlockStore,
) *State {
// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

Expand Down

0 comments on commit 2684c61

Please sign in to comment.