From c0a84df2924144e7f63306a33d89271960851d41 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 19 Apr 2019 11:27:13 -0700 Subject: [PATCH] Align the keepalive implementation with proposal A8. This commit makes the following changes: * Keep track of the time of the last read in the transport. * Use this in the keepalive implementation to decide when to send out keepalives. * Address the issue of keepalives being sent every [Time+Timeout] period instead of every [Time] period, as mandated by proposal A8. * Makes many of the transport tests to run in parallel (as most of them spend a lot of time just sleeping, waiting for things to happen). Proposal A8 is here: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md This commit addresses: https://github.com/grpc/grpc-go/issues/2638 --- internal/transport/http2_client.go | 226 +++++++++++++++++++++++---- internal/transport/transport_test.go | 81 +++++++--- 2 files changed, 255 insertions(+), 52 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 39ab5c075a6..95f446067ca 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -77,11 +77,9 @@ type http2Client struct { perRPCCreds []credentials.PerRPCCredentials - // Boolean to keep track of reading activity on transport. - // 1 is true and 0 is false. - activity uint32 // Accessed atomically. kp keepalive.ClientParameters keepaliveEnabled bool + lr lastRead statsHandler stats.Handler @@ -121,6 +119,16 @@ type http2Client struct { bufferPool *bufferPool } +type lastRead struct { + // Stores the Unix time in nanoseconds. This time cannot be directly embedded + // in the http2Client struct because this field is accessed using functions + // from the atomic package. And on 32-bit machines, it is the caller's + // responsibility to arrange for 64-bit alignment of this field. + timeNano int64 + // Channel to keep track of read activity on the transport. + ch chan struct{} +} + func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { if fn != nil { return fn(ctx, addr) @@ -252,6 +260,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne onClose: onClose, keepaliveEnabled: keepaliveEnabled, bufferPool: newBufferPool(), + activityCh: make(chan struct{}, 1), + lr: lastRead{ch: make(chan struct{}, 1)}, } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { @@ -281,6 +291,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) } if t.keepaliveEnabled { + go t.activityMonitor() go t.keepalive() } // Start the reader goroutine for incoming message. Each transport has @@ -1233,7 +1244,7 @@ func (t *http2Client) reader() { } t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) if t.keepaliveEnabled { - atomic.CompareAndSwapUint32(&t.activity, 0, 1) + t.lr.ch <- struct{}{} } sf, ok := frame.(*http2.SettingsFrame) if !ok { @@ -1248,7 +1259,10 @@ func (t *http2Client) reader() { t.controlBuf.throttle() frame, err := t.framer.fr.ReadFrame() if t.keepaliveEnabled { - atomic.CompareAndSwapUint32(&t.activity, 0, 1) + select { + case t.lr.ch <- struct{}{}: + default: + } } if err != nil { // Abort an active stream if the http2.Framer returns a @@ -1292,17 +1306,54 @@ func (t *http2Client) reader() { } } -// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. +// activityMonitory reads from the activity channel (which is written to, when +// there is a read), and updates the lastRead.timeNano atomic. +func (t *http2Client) activityMonitor() { + for { + select { + case <-t.lr.ch: + atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano()) + case <-t.ctx.Done(): + return + } + } +} + +func minTime(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + +// keepalive running in a separate goroutune makes sure the connection is alive +// by sending pings. func (t *http2Client) keepalive() { p := &ping{data: [8]byte{}} - timer := time.NewTimer(t.kp.Time) + // True iff a PING has been sent, and no data has been received since then + // and the PING hasn't timed out. + var outstandingPing bool + // Amount of time remaining before which we should receive an ACK for the + // last sent PING. + var timeoutLeft time.Duration + // UnixNanos recorded before we go block on the timer. This is required to + // check for read activity since then. + var prevNano int64 for { - select { - case <-timer.C: - if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { - timer.Reset(t.kp.Time) - continue - } + dataRead := false + if prevNano < atomic.LoadInt64(&t.lr.timeNano) { + // Read activity since the last time we were here. + outstandingPing = false + dataRead = true + } + + // Outstanding PING timed out, we are done. + if outstandingPing && timeoutLeft <= 0 { + t.Close() + return + } + + if !dataRead { // Check if keepalive should go dormant. t.mu.Lock() if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { @@ -1313,35 +1364,42 @@ func (t *http2Client) keepalive() { case <-t.awakenKeepalive: // If the control gets here a ping has been sent // need to reset the timer with keepalive.Timeout. + timeoutLeft = t.kp.Timeout + outstandingPing = true case <-t.ctx.Done(): return } } else { t.mu.Unlock() - if channelz.IsOn() { - atomic.AddInt64(&t.czData.kpCount, 1) + if !outstandingPing { + if channelz.IsOn() { + atomic.AddInt64(&t.czData.kpCount, 1) + } + t.controlBuf.put(p) + timeoutLeft = t.kp.Timeout + outstandingPing = true } - // Send ping. - t.controlBuf.put(p) } + } - // By the time control gets here a ping has been sent one way or the other. - timer.Reset(t.kp.Timeout) - select { - case <-timer.C: - if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { - timer.Reset(t.kp.Time) - continue - } - infof("transport: closing client transport due to idleness.") - t.Close() - return - case <-t.ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return - } + // Amount of kp.Time remaining should be calculated from the time of the + // last read activity. + timeLeft := t.kp.Time + if dataRead { + timeLeft = time.Duration(atomic.LoadInt64(&t.lr.timeNano)) + t.kp.Time - time.Duration(time.Now().UTC().UnixNano()) + } + // If a PING is outstanding, the amount of time to sleep here should be the + // minimum of timeoutLeft and timeLeft. + sleepDuration := timeLeft + if outstandingPing { + sleepDuration = minTime(timeLeft, timeoutLeft) + timeoutLeft -= sleepDuration + } + + prevNano = time.Now().UTC().UnixNano() + timer := time.NewTimer(sleepDuration) + select { + case <-timer.C: case <-t.ctx.Done(): if !timer.Stop() { <-timer.C @@ -1351,6 +1409,106 @@ func (t *http2Client) keepalive() { } } +/* +// pinger sends out a PING frame, and waits for one of the following events to +// happen (kp.Timeout expires, top-level context is done, or the done channel +// is written to (this usually happens when there is read activity). +func (t *http2Client) pinger(send bool, done chan struct{}) { + if send { + if channelz.IsOn() { + atomic.AddInt64(&t.czData.kpCount, 1) + } + t.controlBuf.put(&ping{data: [8]byte{}}) + } + + timer := time.NewTimer(t.kp.Timeout) + select { + case <-timer.C: + t.Close() + return + case <-t.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-done: + if !timer.Stop() { + <-timer.C + } + return + } +} + +// keepalive running in a separate goroutune makes sure the connection is alive +// by sending pings. +func (t *http2Client) keepalive() { + go t.activityMonitor() + + ticker := time.NewTicker(defaultKeepaliveTickerDuration) + defer ticker.Stop() + + var done chan struct{} + for { + select { + case <-ticker.C: + // If the amount of time elapsed since the last read is less than kp.Time + // and we have an active pinger routine, we need to cancel it. + lr := time.Since(time.Unix(atomic.LoadInt64(&t.lr.time), 0)) + if lr < t.kp.Time { + if done != nil { + close(done) + done = nil + } + continue + } + // If the control gets here, it means more than kp.Time has elapsed since + // the last read, and we need to do one of the following: + // - If keepalive dormancy conditions are met, then go dormant. + // - If there is an active pinger routine, then do nothing. + // - If dormany conditions are not met and there is no active pinger + // routine, start one. + sendPing := true + goDormant := func() bool { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // If pinger routine is active, it probably means that one was started + // when there was an active stream, but now there is none, and + // therefore we should stop the pinger. + if done != nil { + close(done) + done = nil + } + // Make awakenKeepalive writable. + <-t.awakenKeepalive + return true + } + return false + }() + if goDormant { + select { + case <-t.awakenKeepalive: + // If the control gets here, it means that a ping was sent right + // after stream creation. We need to make sure we get an ack for it + // before kp.Timeout expires. + sendPing = false + case <-t.ctx.Done(): + return + } + } + // Active pinger exists, so there is nothing much to do here. + if done != nil { + continue + } + done = make(chan struct{}) + go t.pinger(sendPing, done) + case <-t.ctx.Done(): + return + } + } +} +*/ + func (t *http2Client) Error() <-chan struct{} { return t.ctx.Done() } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index a3a34319300..341e56009ce 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -462,6 +462,8 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con // TestInflightStreamClosing ensures that closing in-flight stream // sends status error to concurrent stream reader. func TestInflightStreamClosing(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{} server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() @@ -501,6 +503,8 @@ func TestInflightStreamClosing(t *testing.T) { // An idle client is one who doesn't make any RPC calls for a duration of // MaxConnectionIdle time. func TestMaxConnectionIdle(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionIdle: 2 * time.Second, @@ -529,6 +533,8 @@ func TestMaxConnectionIdle(t *testing.T) { // TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client. func TestMaxConnectionIdleNegative(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionIdle: 2 * time.Second, @@ -556,6 +562,8 @@ func TestMaxConnectionIdleNegative(t *testing.T) { // TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge. func TestMaxConnectionAge(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionAge: 2 * time.Second, @@ -588,6 +596,8 @@ const ( // TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings. func TestKeepaliveServer(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ Time: 2 * time.Second, @@ -632,6 +642,8 @@ func TestKeepaliveServer(t *testing.T) { // TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings. func TestKeepaliveServerNegative(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ Time: 2 * time.Second, @@ -653,6 +665,8 @@ func TestKeepaliveServerNegative(t *testing.T) { } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -677,6 +691,8 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -700,6 +716,8 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -728,6 +746,8 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { + t.Parallel() + s, tr, cancel := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. @@ -747,16 +767,18 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { } func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 2 * time.Second, + MinTime: 5 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 50 * time.Millisecond, - Timeout: 1 * time.Second, - PermitWithoutStream: true, + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Run keepalive even with no RPCs. }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) @@ -782,15 +804,17 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 2 * time.Second, + MinTime: 5 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 50 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) @@ -819,16 +843,18 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 100 * time.Millisecond, + MinTime: 1 * time.Second, PermitWithoutStream: true, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 101 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, + Timeout: 5 * time.Second, PermitWithoutStream: true, }, } @@ -838,7 +864,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { defer client.Close() // Give keepalive enough time. - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) // Assert that connection is healthy. client.mu.Lock() defer client.mu.Unlock() @@ -848,15 +874,17 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 100 * time.Millisecond, + MinTime: 1 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 101 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, + Timeout: 5 * time.Second, }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) @@ -869,7 +897,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { } // Give keepalive enough time. - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) // Assert that connection is healthy. client.mu.Lock() defer client.mu.Unlock() @@ -951,18 +979,35 @@ func performOneRPC(ct ClientTransport) { func TestClientMix(t *testing.T) { s, ct, cancel := setUp(t, 0, math.MaxUint32, normal) defer cancel() + done := make(chan struct{}) + go func(s *server) { - time.Sleep(5 * time.Second) + select { + case <-done: + case <-time.After(5 * time.Second): + } s.stop() }(s) + go func(ct ClientTransport) { - <-ct.Error() + select { + case <-done: + case <-ct.Error(): + } ct.Close() }(ct) + + var wg sync.WaitGroup for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) - go performOneRPC(ct) + wg.Add(1) + go func() { + performOneRPC(ct) + wg.Done() + }() } + wg.Wait() + close(done) } func TestLargeMessage(t *testing.T) {