diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 48c5e52edae..64ebd4a167f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1254,11 +1254,97 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - state := &decodeState{} - // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - state.data.isGRPC = !initialHeader - if h2code, err := state.decodeHeader(frame); err != nil { - t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream) + // frame.Truncated is set to true when framer detects that the current header + // list size hits MaxHeaderListSize limit. + if frame.Truncated { + se := status.New(codes.Internal, "peer header list size exceeded limit") + t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream) + return + } + + var ( + // If a gRPC Response-Headers has already been received, then it means + // that the peer is speaking gRPC and we are in gRPC mode. + isGRPC = !initialHeader + mdata = make(map[string][]string) + contentTypeErr string + grpcMessage string + statusGen *status.Status + + httpStatus string + rawStatus string + // headerError is set if an error is encountered while parsing the headers + headerError string + ) + + for _, hf := range frame.Fields { + switch hf.Name { + case "content-type": + if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType { + contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) + break + } + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) + isGRPC = true + case "grpc-encoding": + s.recvCompress = hf.Value + case "grpc-status": + rawStatus = hf.Value + case "grpc-message": + grpcMessage = decodeGrpcMessage(hf.Value) + case "grpc-status-details-bin": + var err error + statusGen, err = decodeGRPCStatusDetails(hf.Value) + if err != nil { + headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err) + } + case ":status": + httpStatus = hf.Value + default: + if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { + break + } + v, err := decodeMetadataHeader(hf.Name, hf.Value) + if err != nil { + headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) + logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) + } + } + + if !isGRPC { + var ( + code = codes.Internal // when header does not include HTTP status, return INTERNAL + httpStatusCode int + ) + + if httpStatus != "" { + c, err := strconv.ParseInt(httpStatus, 10, 32) + if err != nil { + se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + httpStatusCode = int(c) + + var ok bool + code, ok = HTTPStatusConvTab[httpStatusCode] + if !ok { + code = codes.Unknown + } + } + + // Verify the HTTP response is a 200. + se := status.New(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + + if headerError != "" { + se := status.New(codes.Internal, headerError) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } @@ -1293,9 +1379,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = state.data.encoding - if len(state.data.mdata) > 0 { - s.header = state.data.mdata + if len(mdata) > 0 { + s.header = mdata } } else { // HEADERS frame block carries a Trailers-Only. @@ -1308,9 +1393,23 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } + if statusGen == nil { + rawStatusCode := codes.Unknown + if rawStatus != "" { + code, err := strconv.ParseInt(rawStatus, 10, 32) + if err != nil { + se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + rawStatusCode = codes.Code(uint32(code)) + } + statusGen = status.New(rawStatusCode, grpcMessage) + } + // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive - t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) + t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } // reader runs as a separate goroutine in charge of reading data from network diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index c7dee140cf1..2771e224f7b 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -180,14 +180,6 @@ func isWhitelistedHeader(hdr string) bool { } } -func (d *decodeState) status() *status.Status { - if d.data.statusGen == nil { - // No status-details were provided; generate status using code/msg. - d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) - } - return d.data.statusGen -} - const binHdrSuffix = "-bin" func encodeBinHeader(v []byte) string { @@ -217,6 +209,18 @@ func decodeMetadataHeader(k, v string) (string, error) { return v, nil } +func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) { + v, err := decodeBinHeader(rawDetails) + if err != nil { + return nil, err + } + st := &spb.Status{} + if err = proto.Unmarshal(v, st); err != nil { + return nil, err + } + return status.FromProto(st), nil +} + func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. @@ -271,18 +275,24 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode // constructErrMsg constructs error message to be returned in HTTP fallback mode. // Format: HTTP status code and its corresponding message + content-type error message. func (d *decodeState) constructHTTPErrMsg() string { + return constructHTTPErrMsg(d.data.httpStatus, d.data.contentTypeErr) +} + +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status code and its corresponding message + content-type error message. +func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string { var errMsgs []string - if d.data.httpStatus == nil { + if httpStatus == nil { errMsgs = append(errMsgs, "malformed header: missing HTTP status") } else { - errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus)) } - if d.data.contentTypeErr == "" { + if contentTypeErr == "" { errMsgs = append(errMsgs, "transport: missing content-type field") } else { - errMsgs = append(errMsgs, d.data.contentTypeErr) + errMsgs = append(errMsgs, contentTypeErr) } return strings.Join(errMsgs, "; ")