Skip to content

Commit

Permalink
Align the keepalive implementation with proposal A8.
Browse files Browse the repository at this point in the history
This commit makes the following changes:
* Keep track of the time of the last read in the transport.
* Use this in the keepalive implementation to decide when to send out
  keepalives.
* Address the issue of keepalives being sent every [Time+Timeout] period
  instead of every [Time] period, as mandated by proposal A8.
* Makes many of the transport tests to run in parallel (as most of them
  spend a lot of time just sleeping, waiting for things to happen).

Proposal A8 is here:
https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md

This commit addresses:
grpc#2638
  • Loading branch information
easwars committed Aug 20, 2019
1 parent 3bb34e5 commit c0a84df
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 52 deletions.
226 changes: 192 additions & 34 deletions internal/transport/http2_client.go
Expand Up @@ -77,11 +77,9 @@ type http2Client struct {

perRPCCreds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
lr lastRead

statsHandler stats.Handler

Expand Down Expand Up @@ -121,6 +119,16 @@ type http2Client struct {
bufferPool *bufferPool
}

type lastRead struct {
// Stores the Unix time in nanoseconds. This time cannot be directly embedded
// in the http2Client struct because this field is accessed using functions
// from the atomic package. And on 32-bit machines, it is the caller's
// responsibility to arrange for 64-bit alignment of this field.
timeNano int64
// Channel to keep track of read activity on the transport.
ch chan struct{}
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
Expand Down Expand Up @@ -252,6 +260,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
activityCh: make(chan struct{}, 1),
lr: lastRead{ch: make(chan struct{}, 1)},
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
Expand Down Expand Up @@ -281,6 +291,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
}
if t.keepaliveEnabled {
go t.activityMonitor()
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
Expand Down Expand Up @@ -1233,7 +1244,7 @@ func (t *http2Client) reader() {
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
t.lr.ch <- struct{}{}
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
Expand All @@ -1248,7 +1259,10 @@ func (t *http2Client) reader() {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
select {
case t.lr.ch <- struct{}{}:
default:
}
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
Expand Down Expand Up @@ -1292,17 +1306,54 @@ func (t *http2Client) reader() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
// activityMonitory reads from the activity channel (which is written to, when
// there is a read), and updates the lastRead.timeNano atomic.
func (t *http2Client) activityMonitor() {
for {
select {
case <-t.lr.ch:
atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano())
case <-t.ctx.Done():
return
}
}
}

func minTime(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

// keepalive running in a separate goroutune makes sure the connection is alive
// by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
// True iff a PING has been sent, and no data has been received since then
// and the PING hasn't timed out.
var outstandingPing bool
// Amount of time remaining before which we should receive an ACK for the
// last sent PING.
var timeoutLeft time.Duration
// UnixNanos recorded before we go block on the timer. This is required to
// check for read activity since then.
var prevNano int64
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
dataRead := false
if prevNano < atomic.LoadInt64(&t.lr.timeNano) {
// Read activity since the last time we were here.
outstandingPing = false
dataRead = true
}

// Outstanding PING timed out, we are done.
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}

if !dataRead {
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
Expand All @@ -1313,35 +1364,42 @@ func (t *http2Client) keepalive() {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
timeoutLeft = t.kp.Timeout
outstandingPing = true
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// Send ping.
t.controlBuf.put(p)
}
}

// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
// Amount of kp.Time remaining should be calculated from the time of the
// last read activity.
timeLeft := t.kp.Time
if dataRead {
timeLeft = time.Duration(atomic.LoadInt64(&t.lr.timeNano)) + t.kp.Time - time.Duration(time.Now().UTC().UnixNano())
}
// If a PING is outstanding, the amount of time to sleep here should be the
// minimum of timeoutLeft and timeLeft.
sleepDuration := timeLeft
if outstandingPing {
sleepDuration = minTime(timeLeft, timeoutLeft)
timeoutLeft -= sleepDuration
}

prevNano = time.Now().UTC().UnixNano()
timer := time.NewTimer(sleepDuration)
select {
case <-timer.C:
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
Expand All @@ -1351,6 +1409,106 @@ func (t *http2Client) keepalive() {
}
}

/*
// pinger sends out a PING frame, and waits for one of the following events to
// happen (kp.Timeout expires, top-level context is done, or the done channel
// is written to (this usually happens when there is read activity).
func (t *http2Client) pinger(send bool, done chan struct{}) {
if send {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(&ping{data: [8]byte{}})
}
timer := time.NewTimer(t.kp.Timeout)
select {
case <-timer.C:
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-done:
if !timer.Stop() {
<-timer.C
}
return
}
}
// keepalive running in a separate goroutune makes sure the connection is alive
// by sending pings.
func (t *http2Client) keepalive() {
go t.activityMonitor()
ticker := time.NewTicker(defaultKeepaliveTickerDuration)
defer ticker.Stop()
var done chan struct{}
for {
select {
case <-ticker.C:
// If the amount of time elapsed since the last read is less than kp.Time
// and we have an active pinger routine, we need to cancel it.
lr := time.Since(time.Unix(atomic.LoadInt64(&t.lr.time), 0))
if lr < t.kp.Time {
if done != nil {
close(done)
done = nil
}
continue
}
// If the control gets here, it means more than kp.Time has elapsed since
// the last read, and we need to do one of the following:
// - If keepalive dormancy conditions are met, then go dormant.
// - If there is an active pinger routine, then do nothing.
// - If dormany conditions are not met and there is no active pinger
// routine, start one.
sendPing := true
goDormant := func() bool {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If pinger routine is active, it probably means that one was started
// when there was an active stream, but now there is none, and
// therefore we should stop the pinger.
if done != nil {
close(done)
done = nil
}
// Make awakenKeepalive writable.
<-t.awakenKeepalive
return true
}
return false
}()
if goDormant {
select {
case <-t.awakenKeepalive:
// If the control gets here, it means that a ping was sent right
// after stream creation. We need to make sure we get an ack for it
// before kp.Timeout expires.
sendPing = false
case <-t.ctx.Done():
return
}
}
// Active pinger exists, so there is nothing much to do here.
if done != nil {
continue
}
done = make(chan struct{})
go t.pinger(sendPing, done)
case <-t.ctx.Done():
return
}
}
}
*/

func (t *http2Client) Error() <-chan struct{} {
return t.ctx.Done()
}
Expand Down

0 comments on commit c0a84df

Please sign in to comment.