Skip to content

Commit

Permalink
tikv: non-blocking establish superbatch connection with timeout (#12733)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and sre-bot committed Oct 18, 2019
1 parent 48c1571 commit 4c989b0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
21 changes: 7 additions & 14 deletions store/tikv/client.go
Expand Up @@ -148,23 +148,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.v[i] = conn

if allowBatch {
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
a.Close()
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
target: a.target,
conn: conn,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
tikvClientCfg: cfg.TiKVClient,
tikvLoad: &a.tikvTransportLayerLoad,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/client_batch.go
Expand Up @@ -200,6 +200,9 @@ type batchCommandsClient struct {
batched sync.Map
idAlloc uint64

tikvClientCfg config.TiKVClient
tikvLoad *uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// tryLock protects client when re-create the streaming.
Expand Down Expand Up @@ -247,7 +250,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) {

func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
Expand Down Expand Up @@ -455,10 +457,34 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*
RequestIds: requestIDs,
}

if err := cli.initBatchClient(); err != nil {
logutil.BgLogger().Warn(
"init create streaming fail",
zap.String("target", cli.target),
zap.Error(err),
)
return
}

cli.send(req, entries)
return
}

func (c *batchCommandsClient) initBatchClient() error {
if c.client != nil {
return nil
}
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
return errors.Trace(err)
}
c.client = streamClient
go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad)
return nil
}

func (a *batchConn) Close() {
// Close all batchRecvLoop.
for _, c := range a.batchCommandsClients {
Expand Down

0 comments on commit 4c989b0

Please sign in to comment.