Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: client keepalives should be sent every [Time] period #3102

Merged
merged 5 commits into from Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 56 additions & 26 deletions internal/transport/http2_client.go
Expand Up @@ -47,6 +47,7 @@ import (

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lr lastRead // keep this field 64-bit aligned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this int64 and delete the struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ctx context.Context
cancel context.CancelFunc
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
Expand Down Expand Up @@ -76,9 +77,6 @@ type http2Client struct {

perRPCCreds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool

Expand Down Expand Up @@ -130,6 +128,14 @@ type http2Client struct {
bufferPool *bufferPool
}

type lastRead struct {
// Stores the Unix time in nanoseconds. This time cannot be directly
// embedded in the http2Client struct because this field is accessed using
// functions from the atomic package. And on 32-bit machines, it is the
// caller's responsibility to arrange for 64-bit alignment of this field.
timeNano int64
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
Expand Down Expand Up @@ -1240,7 +1246,7 @@ func (t *http2Client) reader() {
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano())
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
Expand All @@ -1255,7 +1261,7 @@ func (t *http2Client) reader() {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano())
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
Expand Down Expand Up @@ -1299,17 +1305,40 @@ func (t *http2Client) reader() {
}
}

func minTime(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
timeoutLeft := time.Duration(0)
// UnixNanos recorded before we go block on the timer. This is required to
// check for read activity since then.
prevNano := time.Now().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
if lastRead := atomic.LoadInt64(&t.lr.timeNano); lastRead > prevNano {
// There has been read activity since the last time we were here.
outstandingPing = false
prevNano = time.Now().UnixNano()
// Next timer should fire at kp.Time seconds from lastRead time.
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(prevNano))
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
Expand All @@ -1322,36 +1351,37 @@ func (t *http2Client) keepalive() {
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If a ping was sent out previously (because there were active
// streams at that point) which wasn't acked and it's timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Language nit: it's -> its

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// hadn't fired, but we got here and are about to go dormant,
// we should make sure that we unconditionally send a ping once
// we awaken.
outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()

if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
t.controlBuf.put(p)

timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration := minTime(t.kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
prevNano = time.Now().UnixNano()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this here? prevNano is used to compare against lastRead. If lastRead is updated between line 1330 and here, we will fail to count that activity.

Consider a crazy race where the ping sent on 1373 is ack'd and that activity is recorded before the code gets here. Yes this is extremely unlikely, but it's theoretically possible which means something is wrong.

Should prevNano be set to the lastRead time (always, including line 1333)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to set prevNano to lastRead all the time.

On 1336: Where the timer has fired, but we have had read activity since the last time we were here, it definitely makes sense to set prevNano to lastRead because we are effectively setting the timer to fire kp.Time seconds after lastRead. So, the next time we get here, we should check if there has been read activity since then. This also eliminates the extremely unlikely race of having read_activity after our atomic read of t.lr.timeNano and the setting of prevNano to time.Now here (and no read_activty till the timer fires the next time).

Here around 1384 also, it makes sense to set it to lastRead. It took me some time to convince myself that this works also for the case where we have just come out of dormancy and have sent out a ping, but I feel convinced now. Please let me know if you feel otherwise.

Thanks for this comment.

timer.Reset(sleepDuration)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
Expand Down
46 changes: 46 additions & 0 deletions internal/transport/keepalive_test.go
Expand Up @@ -350,6 +350,52 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
}
}

// TestKeepaliveClientFrequency creates a server which expects at most 2 client
// pings for every 2.1 seconds, while the client is configured to send a ping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text needs to be updated in light of the number changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// every 1 second. So, this configuration should end up with the client
// transport being closed. But we had a bug wherein the client was sending one
// ping every [Time+Timeout] instead of every [Time] period, and this test
// explicitly makes sure the fix works and the client sends a ping every [Time]
// period.
func TestKeepaliveClientFrequency(t *testing.T) {
serverConfig := &ServerConfig{
KeepalivePolicy: keepalive.EnforcementPolicy{
MinTime: 2100 * time.Millisecond, // 2.1 seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be even more aggressive, like 1.1s? Or will it be flaky (because Travis)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially mistaken to think that the server waits for 2 pingStrikes within the configured MinTime before sending a GOAWAY.
Changed it to 1.2 seconds, and the test passed with the race detector for 50 runs.

},
}
clientOptions := ConnectOptions{
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 2 * time.Second,
PermitWithoutStream: true,
},
}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
defer func() {
client.Close()
server.stop()
cancel()
}()

timeout := time.NewTimer(6 * time.Second)
select {
case <-client.Error():
if !timeout.Stop() {
<-timeout.C
}
if reason := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayTooManyPings)
}
case <-timeout.C:
t.Fatalf("client transport still healthy; expected GoAway from the server.")
}

// Make sure the client transport is not healthy.
if _, err := client.NewStream(context.Background(), &CallHdr{}); err == nil {
t.Fatal("client.NewStream() should have failed, but succeeded")
}
}

// TestKeepaliveServerEnforcementWithAbusiveClientNoRPC verifies that the
// server closes a client transport when it sends too many keepalive pings
// (when there are no active streams), based on the configured
Expand Down