Skip to content

Commit

Permalink
Revert revert abci: change client to use multi-reader mutexes (backport
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored and Cashmaney committed Nov 9, 2022
1 parent a6dd0d2 commit bae62a7
Show file tree
Hide file tree
Showing 37 changed files with 30 additions and 28 deletions.
Empty file modified DOCKER/build.sh
100755 → 100644
Empty file.
Empty file modified DOCKER/docker-entrypoint.sh
100755 → 100644
Empty file.
Empty file modified DOCKER/push.sh
100755 → 100644
Empty file.
6 changes: 3 additions & 3 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx tmsync.Mutex
mtx tmsync.RWMutex

// callbackInvoked as a variable to track if the callback was already
// invoked during the regular execution of the request. This variable
Expand Down Expand Up @@ -138,8 +138,8 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.cb
}

Expand Down
8 changes: 4 additions & 4 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx tmsync.Mutex
mtx tmsync.RWMutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
Expand Down Expand Up @@ -144,17 +144,17 @@ func (cli *grpcClient) StopForError(err error) {
}

func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
26 changes: 14 additions & 12 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var _ Client = (*localClient)(nil)
type localClient struct {
service.BaseService

mtx *tmsync.Mutex
mtx *tmsync.RWMutex
types.Application
Callback
}
Expand All @@ -26,22 +26,24 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
if mtx == nil {
mtx = new(tmsync.Mutex)
mtx = &tmsync.RWMutex{}
}

cli := &localClient{
mtx: mtx,
Application: app,
}

cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}

// TODO: change types.Application to include Error()?
Expand All @@ -65,8 +67,8 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {
}

func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return app.callback(
Expand Down Expand Up @@ -109,8 +111,8 @@ func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return app.callback(
Expand Down Expand Up @@ -218,8 +220,8 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
}

func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Info(req)
return &res, nil
Expand Down Expand Up @@ -250,8 +252,8 @@ func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCh
}

func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.mtx.RLock()
defer app.mtx.RUnlock()

res := app.Application.Query(req)
return &res, nil
Expand Down
8 changes: 4 additions & 4 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type socketClient struct {
reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer

mtx tmsync.Mutex
mtx tmsync.RWMutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
Expand Down Expand Up @@ -99,8 +99,8 @@ func (cli *socketClient) OnStop() {

// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.mtx.RLock()
defer cli.mtx.RUnlock()
return cli.err
}

Expand All @@ -110,8 +110,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}

//----------------------------------------
Expand Down
Empty file modified abci/tests/test_app/test.sh
100755 → 100644
Empty file.
Empty file modified abci/tests/test_cli/test.sh
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(blockDB)

mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
// one for mempool, one for consensus
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)
Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func newStateWithConfigAndBlockStore(
blockStore := store.NewBlockStore(blockDB)

// one for mempool, one for consensus
mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)

proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)
Expand Down
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockDB := dbm.NewMemDB()
blockStore := store.NewBlockStore(blockDB)

mtx := new(tmsync.Mutex)
mtx := new(tmsync.RWMutex)
memplMetrics := mempl.NopMetrics()
// one for mempool, one for consensus
proxyAppConnCon := abcicli.NewLocalClient(mtx, app)
Expand Down
Empty file modified docs/post.sh
100755 → 100644
Empty file.
Empty file modified docs/pre.sh
100755 → 100644
Empty file.
Empty file modified docs/tendermint-core-image.jpg
100755 → 100644
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified networks/local/localnode/wrapper.sh
100755 → 100644
Empty file.
Empty file modified networks/remote/ansible/inventory/digital_ocean.py
100755 → 100644
Empty file.
4 changes: 2 additions & 2 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ type ClientCreator interface {
// local proxy uses a mutex on an in-proc app

type localClientCreator struct {
mtx *tmsync.Mutex
mtx *tmsync.RWMutex
app types.Application
}

// NewLocalClientCreator returns a ClientCreator for the given app,
// which will be running locally.
func NewLocalClientCreator(app types.Application) ClientCreator {
return &localClientCreator{
mtx: new(tmsync.Mutex),
mtx: new(tmsync.RWMutex),
app: app,
}
}
Expand Down
Empty file modified rpc/jsonrpc/test/integration_test.sh
100755 → 100644
Empty file.
Empty file modified scripts/authors.sh
100755 → 100644
Empty file.
Empty file modified scripts/dist.sh
100755 → 100644
Empty file.
Empty file modified scripts/get_nodejs.sh
100755 → 100644
Empty file.
Empty file modified scripts/mockery_generate.sh
100755 → 100644
Empty file.
Empty file modified scripts/proto-gen.sh
100755 → 100644
Empty file.
Empty file modified scripts/qa/reporting/latency_throughput.py
100755 → 100644
Empty file.
Empty file modified spec/ivy-proofs/check_proofs.sh
100755 → 100644
Empty file.
Empty file modified spec/ivy-proofs/count_lines.sh
100755 → 100644
Empty file.
Empty file modified spec/light-client/accountability/run.sh
100755 → 100644
Empty file.
Empty file modified test/app/clean.sh
100755 → 100644
Empty file.
Empty file modified test/app/counter_test.sh
100755 → 100644
Empty file.
Empty file modified test/app/kvstore_test.sh
100755 → 100644
Empty file.
Empty file modified test/app/test.sh
100755 → 100644
Empty file.
Empty file modified test/e2e/docker/entrypoint
100755 → 100644
Empty file.
Empty file modified test/e2e/docker/entrypoint-builtin
100755 → 100644
Empty file.
Empty file modified test/e2e/docker/entrypoint-maverick
100755 → 100644
Empty file.
Empty file modified test/e2e/run-multiple.sh
100755 → 100644
Empty file.
Empty file modified test/loadtime/basic.sh
100755 → 100644
Empty file.

0 comments on commit bae62a7

Please sign in to comment.