Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci: change client to use multi-reader mutexes (backport #6306) #6873

Merged
merged 3 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.Mutex
mtx tmsync.RWMutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
}
Expand Down Expand Up @@ -131,16 +131,16 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true
r.mtx.Unlock()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.Mutex
mtx tmsync.RWMutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
Expand Down Expand Up @@ -146,17 +146,17 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
32 changes: 20 additions & 12 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,35 @@ var _ Client = (*localClient)(nil)
type localClient struct {
service.BaseService

mtx *tmsync.Mutex
mtx *tmsync.RWMutex
types.Application
Callback
}

func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
var _ Client = (*localClient)(nil)

// NewLocalClient creates a local client, which will be directly calling the
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
if mtx == nil {
mtx = new(tmsync.Mutex)
mtx = &tmsync.RWMutex{}
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand All @@ -59,8 +67,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {
}

func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return app.callback(
Expand Down Expand Up @@ -103,8 +111,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return app.callback(
Expand Down Expand Up @@ -212,8 +220,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
}

func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -244,8 +252,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
}

func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type socketClient struct {
reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer

mtx tmsync.Mutex
mtx tmsync.RWMutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
Expand Down Expand Up @@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err
}

Expand All @@ -110,8 +110,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func newStateWithConfigAndBlockStore(
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
proxyAppConnMem := abcicli.NewLocalClient(mtx, app)
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)

Expand Down
4 changes: 2 additions & 2 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ type ClientCreator interface {
// local proxy uses a mutex on an in-proc app

type localClientCreator struct {
mtx *tmsync.Mutex
mtx *tmsync.RWMutex
app types.Application
}

// NewLocalClientCreator returns a ClientCreator for the given app,
// which will be running locally.
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(tmsync.Mutex),
mtx: new(tmsync.RWMutex),
app: app,
}
}
Expand Down