Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: Fix the issue that KV requests might be processed slower because the connections to some KV servers are slow to establish. #12733

Merged
merged 5 commits into from Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 7 additions & 14 deletions store/tikv/client.go
Expand Up @@ -148,23 +148,16 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.v[i] = conn

if allowBatch {
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
a.Close()
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
target: a.target,
conn: conn,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
tikvClientCfg: cfg.TiKVClient,
tikvLoad: &a.tikvTransportLayerLoad,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
Expand Down
28 changes: 27 additions & 1 deletion store/tikv/client_batch.go
Expand Up @@ -200,6 +200,9 @@ type batchCommandsClient struct {
batched sync.Map
idAlloc uint64

tikvClientCfg config.TiKVClient
tikvLoad *uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// tryLock protects client when re-create the streaming.
Expand Down Expand Up @@ -247,7 +250,6 @@ func (c *batchCommandsClient) failPendingRequests(err error) {

func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
Expand Down Expand Up @@ -455,10 +457,34 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*
RequestIds: requestIDs,
}

if err := cli.initBatchClient(); err != nil {
logutil.BgLogger().Warn(
"init create streaming fail",
zap.String("target", cli.target),
zap.Error(err),
)
return
}

cli.send(req, entries)
return
}

func (c *batchCommandsClient) initBatchClient() error {
if c.client != nil {
return nil
}
// Initialize batch streaming clients.
tikvClient := tikvpb.NewTikvClient(c.conn)
streamClient, err := tikvClient.BatchCommands(context.TODO())
if err != nil {
return errors.Trace(err)
}
c.client = streamClient
go c.batchRecvLoop(c.tikvClientCfg, c.tikvLoad)
return nil
}

func (a *batchConn) Close() {
// Close all batchRecvLoop.
for _, c := range a.batchCommandsClients {
Expand Down