Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: return better status for context err when writing header #5292

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 23 additions & 23 deletions internal/transport/http2_server.go
Expand Up @@ -21,7 +21,6 @@ package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -53,10 +52,10 @@ import (
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
)

// serverConnectionCounter counts the number of connections a server has seen
Expand Down Expand Up @@ -931,11 +930,25 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
return true
}

func (t *http2Server) streamContextErr(s *Stream) error {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
}

// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() || s.getState() == streamDone {
if s.updateHeaderSent() {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return ErrIllegalHeaderWrite
}

if s.getState() == streamDone {
return t.streamContextErr(s)
}

s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
Expand All @@ -946,7 +959,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
if err := t.writeHeaderLocked(s); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GH won't let me comment on the right line, but on line 981, there's no guarantee this is a status error. I think it's always ErrConnClosing, in fact. Let's make that return status.Convert(err).Err() -- it's not really efficient or ideal, but it's only in an error condition so it should be okay. This means a closed connection will result in UNKNOWN, but since I'm pretty sure using statuses at all here is a bad idea in the first place, that should be fine. I think I'll separately change all the documentation on ServerStream to recommend against relying upon the returned errors' status codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting changing the error inside writeHeaderLocked to be status.Convert(err).Err() or changing the handling of the return value to be status.Convert(err).Err()?

I changed the outside error, as that's the place where we converted it to status.Internal previously. Let me know if I understood wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way is probably fine. There is only one other usage of writeHeaderLocked, in WriteStatus, and the ultimate user of that just logs the result.

s.hdrMu.Unlock()
return err
return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
Expand Down Expand Up @@ -1062,23 +1075,13 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
if _, ok := err.(ConnectionError); ok {
return err
}
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
return status.Errorf(codes.Internal, "transport: %v", err)
return err
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be deleted now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO or the entire conditional? The tests appear to pass with the entire conditional removed but I'm not familiar enough with this code to understand whether we still need to check if the stream is done elsewhere in Write (if it's called after writing headers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like writeQueue.get checks whether the writeQueue is done. Is the idea that that channel is set when the stream is closed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just referring to the TODO. I don't think this whole thing should be removed or else we'll potentially write operations into the control buffer for closed streams. If the controlbuf isn't checking that the stream is still valid and writes frames, then that's an HTTP/2 protocol violation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. reverted and removed the TODO.

s.cancel()
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
}
df := &dataFrame{
Expand All @@ -1088,12 +1091,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select {
case <-t.done:
return ErrConnClosing
default:
}
return ContextErr(s.ctx.Err())
return t.streamContextErr(s)
}
return t.controlBuf.put(df)
}
Expand Down Expand Up @@ -1254,6 +1252,8 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {

// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
s.cancel()

oldState := s.swapState(streamDone)
if oldState == streamDone {
// If the stream was already done, return.
Expand Down
4 changes: 2 additions & 2 deletions test/end2end_test.go
Expand Up @@ -4964,8 +4964,8 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) {
}
if err := stream.SendMsg(nil); err == nil {
t.Error("expected error sending message on stream after stream closed due to illegal data")
} else if status.Code(err) != codes.Internal {
t.Errorf("expected internal error, instead received '%v'", err)
} else if status.Code(err) != codes.Canceled {
t.Errorf("expected cancel error, instead received '%v'", err)
}
return nil
}}
Expand Down