Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: return better status for context err when writing header #5292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 31 additions & 28 deletions internal/transport/http2_server.go
Expand Up @@ -21,7 +21,6 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -53,10 +52,10 @@ import (
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)

// serverConnectionCounter counts the number of connections a server has seen
Expand Down Expand Up @@ -931,11 +930,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
return true
}

func (t *http2Server) streamContextErr(s *Stream) error {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}

// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
if s.updateHeaderSent() {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return ErrIllegalHeaderWrite
}

if s.getState() == streamDone {
return t.streamContextErr(s)
}

s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
Expand All @@ -946,7 +959,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
if err := t.writeHeaderLocked(s); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GH won't let me comment on the right line, but on line 981, there's no guarantee this is a status error. I think it's always ErrConnClosing, in fact. Let's make that return status.Convert(err).Err() -- it's not really efficient or ideal, but it's only in an error condition so it should be okay. This means a closed connection will result in UNKNOWN, but since I'm pretty sure using statuses at all here is a bad idea in the first place, that should be fine. I think I'll separately change all the documentation on ServerStream to recommend against relying upon the returned errors' status codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting changing the error inside writeHeaderLocked to be status.Convert(err).Err() or changing the handling of the return value to be status.Convert(err).Err()?

I changed the outside error, as that's the place where we converted it to status.Internal previously. Let me know if I understood wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way is probably fine. There is only one other usage of writeHeaderLocked, in WriteStatus, and the ultimate user of that just logs the result.

s.hdrMu.Unlock()
return err
return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
Expand Down Expand Up @@ -1062,23 +1075,12 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
if _, ok := err.(ConnectionError); ok {
return err
}
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
return err
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel()
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
}
df := &dataFrame{
Expand All @@ -1088,12 +1090,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
return t.controlBuf.put(df)
}
Expand Down Expand Up @@ -1229,10 +1226,6 @@ func (t *http2Server) Close() {

// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
Expand All @@ -1254,6 +1247,11 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {

// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
Expand All @@ -1273,6 +1271,11 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h

// closeStream clears the footprint of a stream when the stream is not needed any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

s.swapState(streamDone)
t.deleteStream(s, eosReceived)

Expand Down
4 changes: 2 additions & 2 deletions test/end2end_test.go
Expand Up @@ -4964,8 +4964,8 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) {
}
if err := stream.SendMsg(nil); err == nil {
t.Error("expected error sending message on stream after stream closed due to illegal data")
} else if status.Code(err) != codes.Internal {
t.Errorf("expected internal error, instead received '%v'", err)
} else if status.Code(err) != codes.Canceled {
t.Errorf("expected cancel error, instead received '%v'", err)
}
return nil
}}
Expand Down