Skip to content

Commit

Permalink
client: Keepalive pings should be sent every [Time] period (#3102)
Browse files Browse the repository at this point in the history
This commit makes the following changes:
- Keep track of the time of the last read in the transport.
- Use this in the keepalive implementation to decide when to send out
  keepalives.
- Address the issue of keepalives being sent every [Time+Timeout] period
  instead of every [Time] period, as mandated by proposal A8.

Proposal A8 is here:
https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
  • Loading branch information
easwars committed Oct 24, 2019
1 parent c0909e9 commit 0f2d539
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 26 deletions.
75 changes: 49 additions & 26 deletions internal/transport/http2_client.go
Expand Up @@ -47,6 +47,7 @@ import (

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // keep this field 64-bit aligned
ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
Expand Down Expand Up @@ -76,9 +77,6 @@ type http2Client struct {

perRPCCreds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool

Expand Down Expand Up @@ -1240,7 +1238,7 @@ func (t *http2Client) reader() {
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
Expand All @@ -1255,7 +1253,7 @@ func (t *http2Client) reader() {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
Expand Down Expand Up @@ -1299,17 +1297,41 @@ func (t *http2Client) reader() {
}
}

func minTime(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
timeoutLeft := time.Duration(0)
// Records the last value of t.lastRead before we go block on the timer.
// This is required to check for read activity since then.
prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
lastRead := atomic.LoadInt64(&t.lastRead)
if lastRead > prevNano {
// There has been read activity since the last time we were here.
outstandingPing = false
// Next timer should fire at kp.Time seconds from lastRead time.
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
prevNano = lastRead
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
Expand All @@ -1322,36 +1344,37 @@ func (t *http2Client) keepalive() {
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If a ping was sent out previously (because there were active
// streams at that point) which wasn't acked and its timeout
// hadn't fired, but we got here and are about to go dormant,
// we should make sure that we unconditionally send a ping once
// we awaken.
outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()

if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
t.controlBuf.put(p)

timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
prevNano = lastRead
timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
Expand Down
47 changes: 47 additions & 0 deletions internal/transport/keepalive_test.go
Expand Up @@ -350,6 +350,53 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
}
}

// TestKeepaliveClientFrequency creates a server which expects at most 1 client
// ping for every 1.2 seconds, while the client is configured to send a ping
// every 1 second. So, this configuration should end up with the client
// transport being closed. But we had a bug wherein the client was sending one
// ping every [Time+Timeout] instead of every [Time] period, and this test
// explicitly makes sure the fix works and the client sends a ping every [Time]
// period.
func TestKeepaliveClientFrequency(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 1200 * time.Millisecond, // 1.2 seconds
PermitWithoutStream: true,
},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 2 * time.Second,
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()

timeout := time.NewTimer(6 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
t.Fatalf("client transport still healthy; expected GoAway from the server.")
}

// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}

// TestKeepaliveServerEnforcementWithAbusiveClientNoRPC verifies that the
// server closes a client transport when it sends too many keepalive pings
// (when there are no active streams), based on the configured
Expand Down

0 comments on commit 0f2d539

Please sign in to comment.