From a4a5f285b2d8aca3257fb85dc8a712aa4c141684 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 12 Jan 2020 00:22:01 -0500 Subject: [PATCH] transport: remove decodeState from http_client --- internal/transport/http2_client.go | 147 +++++++++++++++++++++++++++-- internal/transport/http_util.go | 28 ++++-- 2 files changed, 159 insertions(+), 16 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3f30efa0015..eb515b297de 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -30,9 +30,11 @@ import ( "sync/atomic" "time" + "github.com/golang/protobuf/proto" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" @@ -1179,14 +1181,139 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - state := &decodeState{} - // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - state.data.isGRPC = !initialHeader - if err := state.decodeHeader(frame); err != nil { + // frame.Truncated is set to true when framer detects that the current header + // list size hits MaxHeaderListSize limit. + if frame.Truncated { + err := status.Error(codes.Internal, "peer header list size exceeded limit") t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } + var ( + // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. + isGRPC = !initialHeader + mdata = make(map[string][]string) + statusGen *status.Status + grpcErr error + httpErr error + rawStatusCode *int + httpStatus *int + encoding string + contentTypeErr string + rawStatusMsg string + ) + + for _, hf := range frame.Fields { + switch hf.Name { + case "content-type": + _, validContentType := contentSubtype(hf.Value) + if !validContentType { + contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) + break + } + // TODO: do we want to propagate the whole content-type in the metadata, + // or come up with a way to just propagate the content-subtype if it was set? + // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} + // in the metadata? + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) + isGRPC = true + case "grpc-encoding": + encoding = hf.Value + case "grpc-status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + break + } + rawStatusCode = &code + case "grpc-message": + rawStatusMsg = decodeGrpcMessage(hf.Value) + case "grpc-status-details-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + s := &spb.Status{} + if err := proto.Unmarshal(v, s); err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + statusGen = status.FromProto(s) + case ":status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + break + } + httpStatus = &code + case "grpc-tags-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + case "grpc-trace-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + default: + if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { + break + } + v, err := decodeMetadataHeader(hf.Name, hf.Value) + if err != nil { + errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) + } + } + + if isGRPC { + if grpcErr != nil { + t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream) + return + } + if rawStatusCode == nil && statusGen == nil { + // gRPC status doesn't exist. + // Set rawStatusCode to be unknown and return nil error. + // So that, if the stream has ended this Unknown status + // will be propagated to the user. + // Otherwise, it will be ignored. In which case, status from + // a later trailer, that has StreamEnded flag set, is propagated. + code := int(codes.Unknown) + rawStatusCode = &code + } + } else { + if httpErr != nil { + t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream) + return + } + + var ( + code = codes.Internal // when header does not include HTTP status, return INTERNAL + ok bool + ) + + if httpStatus != nil { + code, ok = HTTPStatusConvTab[*(httpStatus)] + if !ok { + code = codes.Unknown + } + } + + msg := constructHTTPErrMsg(httpStatus, contentTypeErr) + if err := status.Error(code, msg); err != nil { + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + } + isHeader := false defer func() { if t.statsHandler != nil { @@ -1217,9 +1344,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = state.data.encoding - if len(state.data.mdata) > 0 { - s.header = state.data.mdata + s.recvCompress = encoding + if len(mdata) > 0 { + s.header = mdata } } else { // HEADERS frame block carries a Trailers-Only. @@ -1234,7 +1361,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive - t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) + + if statusGen == nil { + statusGen = status.New(codes.Code(int32(*(rawStatusCode))), rawStatusMsg) + } + t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } // reader runs as a separate goroutine in charge of reading data from network diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 8f5f3349d90..15e60013c6c 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -222,14 +222,6 @@ func contentType(contentSubtype string) string { return baseContentType + "+" + contentSubtype } -func (d *decodeState) status() *status.Status { - if d.data.statusGen == nil { - // No status-details were provided; generate status using code/msg. - d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) - } - return d.data.statusGen -} - const binHdrSuffix = "-bin" func encodeBinHeader(v []byte) string { @@ -330,6 +322,26 @@ func (d *decodeState) constructHTTPErrMsg() string { return strings.Join(errMsgs, "; ") } +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status code and its corresponding message + content-type error message. +func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string { + var errMsgs []string + + if httpStatus == nil { + errMsgs = append(errMsgs, "malformed header: missing HTTP status") + } else { + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus)) + } + + if contentTypeErr == "" { + errMsgs = append(errMsgs, "transport: missing content-type field") + } else { + errMsgs = append(errMsgs, contentTypeErr) + } + + return strings.Join(errMsgs, "; ") +} + func (d *decodeState) addMetadata(k, v string) { if d.data.mdata == nil { d.data.mdata = make(map[string][]string)