Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport/http2_server : Move up streamID validation in operate headers #4873

Merged
merged 11 commits into from Nov 2, 2021
30 changes: 18 additions & 12 deletions internal/transport/http2_server.go
Expand Up @@ -123,6 +123,7 @@ type http2Server struct {
bufferPool *bufferPool

connectionID uint64
maxStreamMu sync.Mutex // guard the maximum stream ID
uds5501 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewServerTransport creates a http2 transport with conn and configuration
Expand Down Expand Up @@ -334,6 +335,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()

streamID := frame.Header().StreamID

// frame.Truncated is set to true when framer detects that the current header
Expand All @@ -348,14 +353,22 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return false
}

if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
}
t.maxStreamID = streamID

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}

var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
Expand Down Expand Up @@ -498,16 +511,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.cancel()
return false
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
s.cancel()
return true
}
t.maxStreamID = streamID
if httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
Expand Down Expand Up @@ -1293,13 +1296,16 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.maxStreamMu.Lock()
uds5501 marked this conversation as resolved.
Show resolved Hide resolved
sid := t.maxStreamID
uds5501 marked this conversation as resolved.
Show resolved Hide resolved
t.maxStreamMu.Unlock()
uds5501 marked this conversation as resolved.
Show resolved Hide resolved

t.mu.Lock()
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
t.mu.Unlock()
// The transport is closing.
return false, ErrConnClosing
}
sid := t.maxStreamID
if !g.headsUp {
// Stop accepting more streams now.
t.state = draining
Expand Down