Skip to content

Commit

Permalink
server: return better status for context err when writing header (#5292)
Browse files Browse the repository at this point in the history
  • Loading branch information
idiamond-stripe committed Apr 8, 2022
1 parent 5682cc6 commit 924e484
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 30 deletions.
59 changes: 31 additions & 28 deletions internal/transport/http2_server.go
Expand Up @@ -21,7 +21,6 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -53,10 +52,10 @@ import (
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)

// serverConnectionCounter counts the number of connections a server has seen
Expand Down Expand Up @@ -931,11 +930,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
return true
}

func (t *http2Server) streamContextErr(s *Stream) error {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}

// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}

if s.getState() == streamDone {
return t.streamContextErr(s)
}

s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
Expand All @@ -946,7 +959,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return err
return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
Expand Down Expand Up @@ -1062,23 +1075,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
if _, ok := err.(ConnectionError); ok {
return err
}
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
}
df := &dataFrame{
Expand All @@ -1088,12 +1090,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
return t.controlBuf.put(df)
}
Expand Down Expand Up @@ -1229,10 +1226,6 @@ func (t *http2Server) Close() {

// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
Expand All @@ -1254,6 +1247,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {

// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
Expand All @@ -1273,6 +1271,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h

// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

s.swapState(streamDone)
t.deleteStream(s, eosReceived)

Expand Down
4 changes: 2 additions & 2 deletions test/end2end_test.go
Expand Up @@ -4964,8 +4964,8 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) {
}
if err := stream.SendMsg(nil); err == nil {
t.Error("expected error sending message on stream after stream closed due to illegal data")
} else if status.Code(err) != codes.Internal {
t.Errorf("expected internal error, instead received '%v'", err)
} else if status.Code(err) != codes.Canceled {
t.Errorf("expected cancel error, instead received '%v'", err)
}
return nil
}}
Expand Down

0 comments on commit 924e484

Please sign in to comment.