Skip to content

Commit

Permalink
transport: various simplifications noticed during #4447 (#4455)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 18, 2021
1 parent c9c9a75 commit 23a83dd
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 29 deletions.
15 changes: 7 additions & 8 deletions internal/transport/controlbuf.go
Expand Up @@ -296,7 +296,7 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
trfChan atomic.Value // chan struct{}
}

func newControlBuffer(done <-chan struct{}) *controlBuffer {
Expand All @@ -310,10 +310,10 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(*chan struct{})
ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
select {
case <-*ch:
case <-ch:
case <-c.done:
}
}
Expand Down Expand Up @@ -347,8 +347,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
ch := make(chan struct{})
c.trfChan.Store(&ch)
c.trfChan.Store(make(chan struct{}))
}
}
c.mu.Unlock()
Expand Down Expand Up @@ -389,9 +388,9 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(*chan struct{})
close(*ch)
c.trfChan.Store((*chan struct{})(nil))
ch := c.trfChan.Load().(chan struct{})
close(ch)
c.trfChan.Store((chan struct{})(nil))
}
c.transportResponseFrames--
}
Expand Down
3 changes: 1 addition & 2 deletions internal/transport/handler_server.go
Expand Up @@ -141,9 +141,8 @@ type serverHandlerTransport struct {
stats stats.Handler
}

func (ht *serverHandlerTransport) Close() error {
func (ht *serverHandlerTransport) Close() {
ht.closeOnce.Do(ht.closeCloseChanOnce)
return nil
}

func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
Expand Down
8 changes: 3 additions & 5 deletions internal/transport/http2_client.go
Expand Up @@ -399,11 +399,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
if _, ok := err.(net.Error); !ok {
t.conn.Close()
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
close(t.writerDone)
}()
return t, nil
Expand Down
23 changes: 10 additions & 13 deletions internal/transport/http2_server.go
Expand Up @@ -102,11 +102,11 @@ type http2Server struct {

mu sync.Mutex // guard the following

// drainChan is initialized when drain(...) is called the first time.
// drainChan is initialized when Drain() is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState
Expand Down Expand Up @@ -1059,12 +1059,12 @@ func (t *http2Server) keepalive() {
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
t.Drain()
return
}
idleTimer.Reset(val)
case <-ageTimer.C:
t.drain(http2.ErrCodeNo, []byte{})
t.Drain()
ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-ageTimer.C:
Expand Down Expand Up @@ -1118,19 +1118,21 @@ func (t *http2Server) keepalive() {
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func (t *http2Server) Close() error {
func (t *http2Server) Close() {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return errors.New("transport: Close() was already called")
return
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
err := t.conn.Close()
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
Expand All @@ -1142,7 +1144,6 @@ func (t *http2Server) Close() error {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
}
return err
}

// deleteStream deletes the stream s from transport's active streams.
Expand Down Expand Up @@ -1207,17 +1208,13 @@ func (t *http2Server) RemoteAddr() net.Addr {
}

func (t *http2Server) Drain() {
t.drain(http2.ErrCodeNo, []byte{})
}

func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Expand Up @@ -694,7 +694,7 @@ type ServerTransport interface {
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close() error
Close()

// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
Expand Down

0 comments on commit 23a83dd

Please sign in to comment.