diff --git a/store/tikv/client.go b/store/tikv/client.go index 4061ef269b1b3..64882a8b6d31e 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -148,23 +148,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.v[i] = conn if allowBatch { - // Initialize batch streaming clients. - tikvClient := tikvpb.NewTikvClient(conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err != nil { - a.Close() - return errors.Trace(err) - } batchClient := &batchCommandsClient{ - target: a.target, - conn: conn, - client: streamClient, - batched: sync.Map{}, - idAlloc: 0, - closed: 0, + target: a.target, + conn: conn, + batched: sync.Map{}, + idAlloc: 0, + closed: 0, + tikvClientCfg: cfg.TiKVClient, + tikvLoad: &a.tikvTransportLayerLoad, } a.batchCommandsClients = append(a.batchCommandsClients, batchClient) - go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad) } } go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index cbcc68e6981be..d10ca55b68295 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -200,6 +200,9 @@ type batchCommandsClient struct { batched sync.Map idAlloc uint64 + tikvClientCfg config.TiKVClient + tikvLoad *uint64 + // closed indicates the batch client is closed explicitly or not. closed int32 // tryLock protects client when re-create the streaming. @@ -247,7 +250,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) { func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { c.failPendingRequests(err) // fail all pending requests. - // Re-establish a application layer stream. TCP layer is handled by gRPC. tikvClient := tikvpb.NewTikvClient(c.conn) streamClient, err := tikvClient.BatchCommands(context.TODO()) @@ -455,10 +457,34 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* RequestIds: requestIDs, } + if err := cli.initBatchClient(); err != nil { + logutil.BgLogger().Warn( + "init create streaming fail", + zap.String("target", cli.target), + zap.Error(err), + ) + return + } + cli.send(req, entries) return } +func (c *batchCommandsClient) initBatchClient() error { + if c.client != nil { + return nil + } + // Initialize batch streaming clients. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err != nil { + return errors.Trace(err) + } + c.client = streamClient + go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad) + return nil +} + func (a *batchConn) Close() { // Close all batchRecvLoop. for _, c := range a.batchCommandsClients {