Skip to content

Commit

Permalink
transport: remove decodeState from http_client
Browse files Browse the repository at this point in the history
  • Loading branch information
JNProtzman committed Jan 12, 2020
1 parent 20bce9a commit a4a5f28
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 16 deletions.
147 changes: 139 additions & 8 deletions internal/transport/http2_client.go
Expand Up @@ -30,9 +30,11 @@ import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"

spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -1179,14 +1181,139 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
err := status.Error(codes.Internal, "peer header list size exceeded limit")
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}

var (
// If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = !initialHeader
mdata = make(map[string][]string)
statusGen *status.Status
grpcErr error
httpErr error
rawStatusCode *int
httpStatus *int
encoding string
contentTypeErr string
rawStatusMsg string
)

for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
_, validContentType := contentSubtype(hf.Value)
if !validContentType {
contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value)
break
}
// TODO: do we want to propagate the whole content-type in the metadata,
// or come up with a way to just propagate the content-subtype if it was set?
// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
// in the metadata?
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
isGRPC = true
case "grpc-encoding":
encoding = hf.Value
case "grpc-status":
code, err := strconv.Atoi(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
break
}
rawStatusCode = &code
case "grpc-message":
rawStatusMsg = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
break
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
break
}
statusGen = status.FromProto(s)
case ":status":
code, err := strconv.Atoi(hf.Value)
if err != nil {
httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
break
}
httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], string(v))
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}

if isGRPC {
if grpcErr != nil {
t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream)
return
}
if rawStatusCode == nil && statusGen == nil {
// gRPC status doesn't exist.
// Set rawStatusCode to be unknown and return nil error.
// So that, if the stream has ended this Unknown status
// will be propagated to the user.
// Otherwise, it will be ignored. In which case, status from
// a later trailer, that has StreamEnded flag set, is propagated.
code := int(codes.Unknown)
rawStatusCode = &code
}
} else {
if httpErr != nil {
t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream)
return
}

var (
code = codes.Internal // when header does not include HTTP status, return INTERNAL
ok bool
)

if httpStatus != nil {
code, ok = HTTPStatusConvTab[*(httpStatus)]
if !ok {
code = codes.Unknown
}
}

msg := constructHTTPErrMsg(httpStatus, contentTypeErr)
if err := status.Error(code, msg); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
}

isHeader := false
defer func() {
if t.statsHandler != nil {
Expand Down Expand Up @@ -1217,9 +1344,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = state.data.encoding
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
s.recvCompress = encoding
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
Expand All @@ -1234,7 +1361,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {

// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)

if statusGen == nil {
statusGen = status.New(codes.Code(int32(*(rawStatusCode))), rawStatusMsg)
}
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}

// reader runs as a separate goroutine in charge of reading data from network
Expand Down
28 changes: 20 additions & 8 deletions internal/transport/http_util.go
Expand Up @@ -222,14 +222,6 @@ func contentType(contentSubtype string) string {
return baseContentType + "+" + contentSubtype
}

func (d *decodeState) status() *status.Status {
if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
return d.data.statusGen
}

const binHdrSuffix = "-bin"

func encodeBinHeader(v []byte) string {
Expand Down Expand Up @@ -330,6 +322,26 @@ func (d *decodeState) constructHTTPErrMsg() string {
return strings.Join(errMsgs, "; ")
}

// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string {
var errMsgs []string

if httpStatus == nil {
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
} else {
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus))
}

if contentTypeErr == "" {
errMsgs = append(errMsgs, "transport: missing content-type field")
} else {
errMsgs = append(errMsgs, contentTypeErr)
}

return strings.Join(errMsgs, "; ")
}

func (d *decodeState) addMetadata(k, v string) {
if d.data.mdata == nil {
d.data.mdata = make(map[string][]string)
Expand Down

0 comments on commit a4a5f28

Please sign in to comment.