Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci: change client to use multi-reader mutexes #6306

Merged
merged 8 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.Mutex
mtx tmsync.RWMutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
}
Expand Down Expand Up @@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true
r.mtx.Unlock()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
12 changes: 6 additions & 6 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.Mutex
mtx tmsync.RWMutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
Expand Down Expand Up @@ -68,8 +68,8 @@ func (cli *grpcClient) OnStart() error {
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
tychoish marked this conversation as resolved.
Show resolved Hide resolved
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()

reqres.SetDone()
reqres.Done()
Expand Down Expand Up @@ -149,17 +149,17 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
110 changes: 56 additions & 54 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type localClient struct {
service.BaseService

mtx *tmsync.Mutex
mtx *tmsync.RWMutex
types.Application
Callback
}
Expand All @@ -26,22 +26,24 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
if mtx == nil {
mtx = new(tmsync.Mutex)
mtx = &tmsync.RWMutex{}
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand All @@ -55,8 +57,8 @@ func (app *localClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
}

func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

return app.callback(
types.ToRequestEcho(msg),
Expand All @@ -65,8 +67,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err
}

func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return app.callback(
Expand All @@ -76,8 +78,8 @@ func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*
}

func (app *localClient) DeliverTxAsync(ctx context.Context, params types.RequestDeliverTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
defer app.mtx.RUnlock()

res := app.Application.DeliverTx(params)
return app.callback(
Expand All @@ -87,8 +89,8 @@ func (app *localClient) DeliverTxAsync(ctx context.Context, params types.Request
}

func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.CheckTx(req)
return app.callback(
Expand All @@ -98,8 +100,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck
}

func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return app.callback(
Expand All @@ -109,8 +111,8 @@ func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery)
}

func (app *localClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Commit()
return app.callback(
Expand All @@ -120,8 +122,8 @@ func (app *localClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
}

func (app *localClient) InitChainAsync(ctx context.Context, req types.RequestInitChain) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.InitChain(req)
return app.callback(
Expand All @@ -131,8 +133,8 @@ func (app *localClient) InitChainAsync(ctx context.Context, req types.RequestIni
}

func (app *localClient) BeginBlockAsync(ctx context.Context, req types.RequestBeginBlock) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.BeginBlock(req)
return app.callback(
Expand All @@ -142,8 +144,8 @@ func (app *localClient) BeginBlockAsync(ctx context.Context, req types.RequestBe
}

func (app *localClient) EndBlockAsync(ctx context.Context, req types.RequestEndBlock) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.EndBlock(req)
return app.callback(
Expand All @@ -153,8 +155,8 @@ func (app *localClient) EndBlockAsync(ctx context.Context, req types.RequestEndB
}

func (app *localClient) ListSnapshotsAsync(ctx context.Context, req types.RequestListSnapshots) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.ListSnapshots(req)
return app.callback(
Expand All @@ -164,8 +166,8 @@ func (app *localClient) ListSnapshotsAsync(ctx context.Context, req types.Reques
}

func (app *localClient) OfferSnapshotAsync(ctx context.Context, req types.RequestOfferSnapshot) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.OfferSnapshot(req)
return app.callback(
Expand All @@ -178,8 +180,8 @@ func (app *localClient) LoadSnapshotChunkAsync(
ctx context.Context,
req types.RequestLoadSnapshotChunk,
) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.LoadSnapshotChunk(req)
return app.callback(
Expand All @@ -192,8 +194,8 @@ func (app *localClient) ApplySnapshotChunkAsync(
ctx context.Context,
req types.RequestApplySnapshotChunk,
) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.ApplySnapshotChunk(req)
return app.callback(
Expand All @@ -213,8 +215,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon
}

func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return &res, nil
Expand All @@ -225,8 +227,8 @@ func (app *localClient) DeliverTxSync(
req types.RequestDeliverTx,
) (*types.ResponseDeliverTx, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.DeliverTx(req)
return &res, nil
Expand All @@ -236,8 +238,8 @@ func (app *localClient) CheckTxSync(
ctx context.Context,
req types.RequestCheckTx,
) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.CheckTx(req)
return &res, nil
Expand All @@ -247,16 +249,16 @@ func (app *localClient) QuerySync(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return &res, nil
}

func (app *localClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Commit()
return &res, nil
Expand All @@ -267,8 +269,8 @@ func (app *localClient) InitChainSync(
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.InitChain(req)
return &res, nil
Expand All @@ -279,8 +281,8 @@ func (app *localClient) BeginBlockSync(
req types.RequestBeginBlock,
) (*types.ResponseBeginBlock, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.BeginBlock(req)
return &res, nil
Expand All @@ -291,8 +293,8 @@ func (app *localClient) EndBlockSync(
req types.RequestEndBlock,
) (*types.ResponseEndBlock, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.EndBlock(req)
return &res, nil
Expand All @@ -303,8 +305,8 @@ func (app *localClient) ListSnapshotsSync(
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.ListSnapshots(req)
return &res, nil
Expand All @@ -315,8 +317,8 @@ func (app *localClient) OfferSnapshotSync(
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.OfferSnapshot(req)
return &res, nil
Expand All @@ -326,8 +328,8 @@ func (app *localClient) LoadSnapshotChunkSync(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.LoadSnapshotChunk(req)
return &res, nil
Expand All @@ -337,8 +339,8 @@ func (app *localClient) ApplySnapshotChunkSync(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {

app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.ApplySnapshotChunk(req)
return &res, nil
Expand Down