From a299d6c1d5a786586a9330a40ca07f6aac889e61 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 17 Oct 2019 20:27:52 +0800 Subject: [PATCH 1/3] tikv: non-blocking establish superbatch connection --- store/tikv/client.go | 21 ++++------- store/tikv/client_batch.go | 76 ++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index 4061ef269b1b..64882a8b6d31 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -148,23 +148,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.v[i] = conn if allowBatch { - // Initialize batch streaming clients. - tikvClient := tikvpb.NewTikvClient(conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err != nil { - a.Close() - return errors.Trace(err) - } batchClient := &batchCommandsClient{ - target: a.target, - conn: conn, - client: streamClient, - batched: sync.Map{}, - idAlloc: 0, - closed: 0, + target: a.target, + conn: conn, + batched: sync.Map{}, + idAlloc: 0, + closed: 0, + tikvClientCfg: cfg.TiKVClient, + tikvLoad: &a.tikvTransportLayerLoad, } a.batchCommandsClients = append(a.batchCommandsClients, batchClient) - go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad) } } go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index cbcc68e6981b..c5d0202c0893 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -16,6 +16,7 @@ package tikv import ( "context" + "google.golang.org/grpc/connectivity" "math" "sync" "sync/atomic" @@ -200,6 +201,9 @@ type batchCommandsClient struct { batched sync.Map idAlloc uint64 + tikvClientCfg config.TiKVClient + tikvLoad *uint64 + // closed indicates the batch client is closed explicitly or not. closed int32 // tryLock protects client when re-create the streaming. @@ -245,20 +249,35 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { - c.failPendingRequests(err) // fail all pending requests. - - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) +func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error { + c.failPendingRequests(perr) // fail all pending requests. + var err error + dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) + for { + s := c.conn.GetState() + if s == connectivity.Ready { + cancel() + break + } + if !c.conn.WaitForStateChange(dialCtx, s) { + cancel() + err = dialCtx.Err() + break + } + } if err == nil { - logutil.BgLogger().Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient + // Re-establish a application layer stream. TCP layer is handled by gRPC. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err == nil { + logutil.BgLogger().Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient - return nil + return nil + } } logutil.BgLogger().Info( "batchRecvLoop re-create streaming fail", @@ -455,10 +474,43 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* RequestIds: requestIDs, } + if err := cli.initBatchClient(); err != nil { + return + } + cli.send(req, entries) return } +func (c *batchCommandsClient) initBatchClient() error { + if c.client != nil { + return nil + } + + dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) + for { + s := c.conn.GetState() + if s == connectivity.Ready { + cancel() + break + } + if !c.conn.WaitForStateChange(dialCtx, s) { + cancel() + return dialCtx.Err() + } + } + + // Initialize batch streaming clients. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err != nil { + return errors.Trace(err) + } + c.client = streamClient + go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad) + return nil +} + func (a *batchConn) Close() { // Close all batchRecvLoop. for _, c := range a.batchCommandsClients { From 77022520c67117dd2a00a92f76ddaaad2db6a909 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 17 Oct 2019 20:45:46 +0800 Subject: [PATCH 2/3] add some log --- metrics/tikvclient.go | 9 +++++++++ store/tikv/client_batch.go | 11 +++++++++++ 2 files changed, 20 insertions(+) diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 8b9cb514338b..16ca5ee44d3f 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -206,6 +206,15 @@ var ( Help: "batch client unavailable", }) + TiKVBatchClientWaitEstablish = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "batch_client_wait_establish_seconds", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 1000s + Help: "batch client wait establish time", + }) + TiKVRangeTaskStats = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "tidb", diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index c5d0202c0893..40ff9ce522e5 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -252,15 +252,18 @@ func (c *batchCommandsClient) failPendingRequests(err error) { func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error { c.failPendingRequests(perr) // fail all pending requests. var err error + start := time.Now() dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) for { s := c.conn.GetState() if s == connectivity.Ready { cancel() + metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) break } if !c.conn.WaitForStateChange(dialCtx, s) { cancel() + metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) err = dialCtx.Err() break } @@ -475,6 +478,11 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* } if err := cli.initBatchClient(); err != nil { + logutil.BgLogger().Warn( + "init create streaming fail", + zap.String("target", cli.target), + zap.Error(err), + ) return } @@ -487,15 +495,18 @@ func (c *batchCommandsClient) initBatchClient() error { return nil } + start := time.Now() dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) for { s := c.conn.GetState() if s == connectivity.Ready { cancel() + metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) break } if !c.conn.WaitForStateChange(dialCtx, s) { cancel() + metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) return dialCtx.Err() } } From bdd73b09e44c699134760632e4261afd6bc7ae98 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 17 Oct 2019 21:07:38 +0800 Subject: [PATCH 3/3] address comment --- metrics/tikvclient.go | 9 ------ store/tikv/client_batch.go | 59 +++++++------------------------------- 2 files changed, 11 insertions(+), 57 deletions(-) diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index 16ca5ee44d3f..8b9cb514338b 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -206,15 +206,6 @@ var ( Help: "batch client unavailable", }) - TiKVBatchClientWaitEstablish = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "tidb", - Subsystem: "tikvclient", - Name: "batch_client_wait_establish_seconds", - Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 1000s - Help: "batch client wait establish time", - }) - TiKVRangeTaskStats = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "tidb", diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 40ff9ce522e5..d10ca55b6829 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -16,7 +16,6 @@ package tikv import ( "context" - "google.golang.org/grpc/connectivity" "math" "sync" "sync/atomic" @@ -249,38 +248,19 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error { - c.failPendingRequests(perr) // fail all pending requests. - var err error - start := time.Now() - dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) - for { - s := c.conn.GetState() - if s == connectivity.Ready { - cancel() - metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) - break - } - if !c.conn.WaitForStateChange(dialCtx, s) { - cancel() - metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) - err = dialCtx.Err() - break - } - } +func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { + c.failPendingRequests(err) // fail all pending requests. + // Re-establish a application layer stream. TCP layer is handled by gRPC. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) if err == nil { - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err == nil { - logutil.BgLogger().Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient + logutil.BgLogger().Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient - return nil - } + return nil } logutil.BgLogger().Info( "batchRecvLoop re-create streaming fail", @@ -494,23 +474,6 @@ func (c *batchCommandsClient) initBatchClient() error { if c.client != nil { return nil } - - start := time.Now() - dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) - for { - s := c.conn.GetState() - if s == connectivity.Ready { - cancel() - metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) - break - } - if !c.conn.WaitForStateChange(dialCtx, s) { - cancel() - metrics.TiKVBatchClientWaitEstablish.Observe(time.Since(start).Seconds()) - return dialCtx.Err() - } - } - // Initialize batch streaming clients. tikvClient := tikvpb.NewTikvClient(c.conn) streamClient, err := tikvClient.BatchCommands(context.TODO())