From a4a5f285b2d8aca3257fb85dc8a712aa4c141684 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 12 Jan 2020 00:22:01 -0500 Subject: [PATCH 01/18] transport: remove decodeState from http_client --- internal/transport/http2_client.go | 147 +++++++++++++++++++++++++++-- internal/transport/http_util.go | 28 ++++-- 2 files changed, 159 insertions(+), 16 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3f30efa0015..eb515b297de 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -30,9 +30,11 @@ import ( "sync/atomic" "time" + "github.com/golang/protobuf/proto" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" @@ -1179,14 +1181,139 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - state := &decodeState{} - // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - state.data.isGRPC = !initialHeader - if err := state.decodeHeader(frame); err != nil { + // frame.Truncated is set to true when framer detects that the current header + // list size hits MaxHeaderListSize limit. + if frame.Truncated { + err := status.Error(codes.Internal, "peer header list size exceeded limit") t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } + var ( + // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. + isGRPC = !initialHeader + mdata = make(map[string][]string) + statusGen *status.Status + grpcErr error + httpErr error + rawStatusCode *int + httpStatus *int + encoding string + contentTypeErr string + rawStatusMsg string + ) + + for _, hf := range frame.Fields { + switch hf.Name { + case "content-type": + _, validContentType := contentSubtype(hf.Value) + if !validContentType { + contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) + break + } + // TODO: do we want to propagate the whole content-type in the metadata, + // or come up with a way to just propagate the content-subtype if it was set? + // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} + // in the metadata? + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) + isGRPC = true + case "grpc-encoding": + encoding = hf.Value + case "grpc-status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + break + } + rawStatusCode = &code + case "grpc-message": + rawStatusMsg = decodeGrpcMessage(hf.Value) + case "grpc-status-details-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + s := &spb.Status{} + if err := proto.Unmarshal(v, s); err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + statusGen = status.FromProto(s) + case ":status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + break + } + httpStatus = &code + case "grpc-tags-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + case "grpc-trace-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + default: + if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { + break + } + v, err := decodeMetadataHeader(hf.Name, hf.Value) + if err != nil { + errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) + } + } + + if isGRPC { + if grpcErr != nil { + t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream) + return + } + if rawStatusCode == nil && statusGen == nil { + // gRPC status doesn't exist. + // Set rawStatusCode to be unknown and return nil error. + // So that, if the stream has ended this Unknown status + // will be propagated to the user. + // Otherwise, it will be ignored. In which case, status from + // a later trailer, that has StreamEnded flag set, is propagated. + code := int(codes.Unknown) + rawStatusCode = &code + } + } else { + if httpErr != nil { + t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream) + return + } + + var ( + code = codes.Internal // when header does not include HTTP status, return INTERNAL + ok bool + ) + + if httpStatus != nil { + code, ok = HTTPStatusConvTab[*(httpStatus)] + if !ok { + code = codes.Unknown + } + } + + msg := constructHTTPErrMsg(httpStatus, contentTypeErr) + if err := status.Error(code, msg); err != nil { + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + } + isHeader := false defer func() { if t.statsHandler != nil { @@ -1217,9 +1344,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = state.data.encoding - if len(state.data.mdata) > 0 { - s.header = state.data.mdata + s.recvCompress = encoding + if len(mdata) > 0 { + s.header = mdata } } else { // HEADERS frame block carries a Trailers-Only. @@ -1234,7 +1361,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive - t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) + + if statusGen == nil { + statusGen = status.New(codes.Code(int32(*(rawStatusCode))), rawStatusMsg) + } + t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } // reader runs as a separate goroutine in charge of reading data from network diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 8f5f3349d90..15e60013c6c 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -222,14 +222,6 @@ func contentType(contentSubtype string) string { return baseContentType + "+" + contentSubtype } -func (d *decodeState) status() *status.Status { - if d.data.statusGen == nil { - // No status-details were provided; generate status using code/msg. - d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) - } - return d.data.statusGen -} - const binHdrSuffix = "-bin" func encodeBinHeader(v []byte) string { @@ -330,6 +322,26 @@ func (d *decodeState) constructHTTPErrMsg() string { return strings.Join(errMsgs, "; ") } +// constructErrMsg constructs error message to be returned in HTTP fallback mode. +// Format: HTTP status code and its corresponding message + content-type error message. +func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string { + var errMsgs []string + + if httpStatus == nil { + errMsgs = append(errMsgs, "malformed header: missing HTTP status") + } else { + errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus)) + } + + if contentTypeErr == "" { + errMsgs = append(errMsgs, "transport: missing content-type field") + } else { + errMsgs = append(errMsgs, contentTypeErr) + } + + return strings.Join(errMsgs, "; ") +} + func (d *decodeState) addMetadata(k, v string) { if d.data.mdata == nil { d.data.mdata = make(map[string][]string) From 3f62a6eb4b529f81644f23524674ac4982918315 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 26 Jan 2020 20:45:42 -0500 Subject: [PATCH 02/18] Remove grpcErr & httpErr --- internal/transport/http2_client.go | 61 ++++++++++++++++++------------ 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index eb515b297de..4339796e246 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1194,11 +1194,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { isGRPC = !initialHeader mdata = make(map[string][]string) statusGen *status.Status - grpcErr error - httpErr error rawStatusCode *int httpStatus *int - encoding string contentTypeErr string rawStatusMsg string ) @@ -1218,11 +1215,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { mdata[hf.Name] = append(mdata[hf.Name], hf.Value) isGRPC = true case "grpc-encoding": - encoding = hf.Value + if !endStream { + s.recvCompress = hf.Value + } case "grpc-status": code, err := strconv.Atoi(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + if isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } rawStatusCode = &code @@ -1231,33 +1234,53 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { case "grpc-status-details-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + if isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } - s := &spb.Status{} - if err := proto.Unmarshal(v, s); err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + pbStatus := &spb.Status{} + if err := proto.Unmarshal(v, pbStatus); err != nil { + if isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } - statusGen = status.FromProto(s) + statusGen = status.FromProto(pbStatus) case ":status": code, err := strconv.Atoi(hf.Value) if err != nil { - httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + if !isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } httpStatus = &code case "grpc-tags-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + if isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } mdata[hf.Name] = append(mdata[hf.Name], string(v)) case "grpc-trace-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + if isGRPC { + err = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } break } mdata[hf.Name] = append(mdata[hf.Name], string(v)) @@ -1275,10 +1298,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } if isGRPC { - if grpcErr != nil { - t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream) - return - } if rawStatusCode == nil && statusGen == nil { // gRPC status doesn't exist. // Set rawStatusCode to be unknown and return nil error. @@ -1290,11 +1309,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { rawStatusCode = &code } } else { - if httpErr != nil { - t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream) - return - } - var ( code = codes.Internal // when header does not include HTTP status, return INTERNAL ok bool @@ -1344,7 +1358,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = encoding if len(mdata) > 0 { s.header = mdata } @@ -1363,7 +1376,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { rst := s.getState() == streamActive if statusGen == nil { - statusGen = status.New(codes.Code(int32(*(rawStatusCode))), rawStatusMsg) + statusGen = status.New(codes.Code(int32(*rawStatusCode)), rawStatusMsg) } t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } From 0804bd7983332bb369909a83fc3aa779ed88de3a Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sat, 15 Feb 2020 17:23:31 -0500 Subject: [PATCH 03/18] wip --- internal/transport/http2_client.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 8f5d7d21ef6..77a046f09dd 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1310,15 +1310,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { rawStatusCode = &code } } else { - var ( - code = codes.Internal // when header does not include HTTP status, return INTERNAL - ok bool - ) + code := codes.Internal // when header does not include HTTP status, return INTERNAL if httpStatus != nil { - code, ok = HTTPStatusConvTab[*(httpStatus)] - if !ok { - code = codes.Unknown + code = codes.Unknown + if c, ok := HTTPStatusConvTab[*(httpStatus)]; ok { + code = c } } From 650d1820a09f48d094e5927453b0a49d190104f5 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 8 Mar 2020 16:09:02 -0400 Subject: [PATCH 04/18] Remove several local variables --- internal/transport/http2_client.go | 92 ++++++++++++++++++------------ 1 file changed, 56 insertions(+), 36 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 77a046f09dd..3a42f5d15fd 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1192,36 +1192,28 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { var ( // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - isGRPC = !initialHeader - mdata = make(map[string][]string) - statusGen *status.Status - rawStatusCode *int - httpStatus *int - contentTypeErr string - rawStatusMsg string + isGRPC = !initialHeader + mdata = make(map[string][]string) + statusGen *status.Status ) for _, hf := range frame.Fields { switch hf.Name { case "content-type": - _, validContentType := contentSubtype(hf.Value) - if !validContentType { - contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) - break - } // TODO: do we want to propagate the whole content-type in the metadata, // or come up with a way to just propagate the content-subtype if it was set? // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} // in the metadata? + if _, validContentType := contentSubtype(hf.Value); validContentType { + isGRPC = true + } mdata[hf.Name] = append(mdata[hf.Name], hf.Value) - isGRPC = true case "grpc-encoding": if !endStream { s.recvCompress = hf.Value } case "grpc-status": - code, err := strconv.Atoi(hf.Value) - if err != nil { + if _, err := strconv.Atoi(hf.Value); err != nil { if isGRPC { err = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) @@ -1229,9 +1221,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } break } - rawStatusCode = &code + mdata["grpc-status"] = []string{hf.Value} case "grpc-message": - rawStatusMsg = decodeGrpcMessage(hf.Value) + mdata["grpc-message"] = []string{decodeGrpcMessage(hf.Value)} case "grpc-status-details-bin": v, err := decodeBinHeader(hf.Value) if err != nil { @@ -1253,8 +1245,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } statusGen = status.FromProto(pbStatus) case ":status": - code, err := strconv.Atoi(hf.Value) - if err != nil { + if _, err := strconv.Atoi(hf.Value); err != nil { if !isGRPC { err = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) @@ -1262,7 +1253,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } break } - httpStatus = &code + mdata[":status"] = []string{hf.Value} case "grpc-tags-bin": v, err := decodeBinHeader(hf.Value) if err != nil { @@ -1298,33 +1289,39 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } } - if isGRPC { - if rawStatusCode == nil && statusGen == nil { - // gRPC status doesn't exist. - // Set rawStatusCode to be unknown and return nil error. - // So that, if the stream has ended this Unknown status - // will be propagated to the user. - // Otherwise, it will be ignored. In which case, status from - // a later trailer, that has StreamEnded flag set, is propagated. - code := int(codes.Unknown) - rawStatusCode = &code + if !isGRPC { + var httpStatus *int + if httpStatuses := mdata[":status"]; len(httpStatuses) == 1 { + httpStatusCode, err := strconv.Atoi(httpStatuses[0]) + if err != nil { + err = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + httpStatus = &httpStatusCode } - } else { - code := codes.Internal // when header does not include HTTP status, return INTERNAL + var code = codes.Internal // when header does not include HTTP status, return INTERNAL if httpStatus != nil { - code = codes.Unknown - if c, ok := HTTPStatusConvTab[*(httpStatus)]; ok { - code = c + var ok bool + code, ok = HTTPStatusConvTab[*(httpStatus)] + if !ok { + code = codes.Unknown } } + contentType := "missing" + if ct := mdata["content-type"]; len(ct) == 1 { + contentType = ct[0] + } + contentTypeErr := fmt.Sprintf("transport: received the unexpected content-type %q", contentType) msg := constructHTTPErrMsg(httpStatus, contentTypeErr) if err := status.Error(code, msg); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } } + delete(mdata, ":status") isHeader := false defer func() { @@ -1374,8 +1371,31 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { rst := s.getState() == streamActive if statusGen == nil { - statusGen = status.New(codes.Code(int32(*rawStatusCode)), rawStatusMsg) + var rawStatusMsg string + if rsm := mdata["grpc-message"]; len(rsm) == 1 { + rawStatusMsg = rsm[0] + } + // gRPC status doesn't exist. + // Set rawStatusCode to be unknown and return nil error. + // So that, if the stream has ended this Unknown status + // will be propagated to the user. + // Otherwise, it will be ignored. In which case, status from + // a later trailer, that has StreamEnded flag set, is propagated. + rawStatusCode := int32(codes.Unknown) + if rsc := mdata["grpc-status"]; len(rsc) == 1 { + code, err := strconv.Atoi(rsc[0]) + if err != nil { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + rawStatusCode = int32(code) + } + statusGen = status.New(codes.Code(rawStatusCode), rawStatusMsg) } + + delete(mdata, "grpc-message") + delete(mdata, "grpc-status") t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } From e9bbc9b6dfd28ed04a7659a604e93009e42a6d39 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 8 Mar 2020 16:26:17 -0400 Subject: [PATCH 05/18] Cleanup redundant code --- internal/transport/http2_client.go | 60 ++++++++++-------------------- 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3a42f5d15fd..7adc301a9c3 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -21,6 +21,7 @@ package transport import ( "context" "fmt" + spb "google.golang.org/genproto/googleapis/rpc/status" "io" "math" "net" @@ -34,7 +35,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" @@ -1194,7 +1194,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. isGRPC = !initialHeader mdata = make(map[string][]string) - statusGen *status.Status ) for _, hf := range frame.Fields { @@ -1213,46 +1212,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { s.recvCompress = hf.Value } case "grpc-status": - if _, err := strconv.Atoi(hf.Value); err != nil { - if isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } mdata["grpc-status"] = []string{hf.Value} case "grpc-message": mdata["grpc-message"] = []string{decodeGrpcMessage(hf.Value)} case "grpc-status-details-bin": - v, err := decodeBinHeader(hf.Value) - if err != nil { - if isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } - pbStatus := &spb.Status{} - if err := proto.Unmarshal(v, pbStatus); err != nil { - if isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } - statusGen = status.FromProto(pbStatus) + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) case ":status": - if _, err := strconv.Atoi(hf.Value); err != nil { - if !isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } mdata[":status"] = []string{hf.Value} case "grpc-tags-bin": v, err := decodeBinHeader(hf.Value) @@ -1370,13 +1335,28 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive - if statusGen == nil { + var statusGen *status.Status + if sg := mdata["grpc-status-details-bin"]; len(sg) == 1 { + v, err := decodeBinHeader(sg[0]) + if err != nil { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + pbStatus := &spb.Status{} + if err := proto.Unmarshal(v, pbStatus); err != nil { + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return + } + statusGen = status.FromProto(pbStatus) + } else { var rawStatusMsg string if rsm := mdata["grpc-message"]; len(rsm) == 1 { rawStatusMsg = rsm[0] } - // gRPC status doesn't exist. - // Set rawStatusCode to be unknown and return nil error. + // if gRPC status doesn't exist, Set rawStatusCode to be + // unknown and return nil error. // So that, if the stream has ended this Unknown status // will be propagated to the user. // Otherwise, it will be ignored. In which case, status from From 00df58bad5a36b56a4a2e36f5b034deaf99717a6 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 8 Mar 2020 17:53:23 -0400 Subject: [PATCH 06/18] go fmt --- internal/transport/http2_client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 7adc301a9c3..2544369ee90 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1192,8 +1192,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { var ( // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - isGRPC = !initialHeader - mdata = make(map[string][]string) + isGRPC = !initialHeader + mdata = make(map[string][]string) ) for _, hf := range frame.Fields { @@ -1339,15 +1339,15 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if sg := mdata["grpc-status-details-bin"]; len(sg) == 1 { v, err := decodeBinHeader(sg[0]) if err != nil { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return } pbStatus := &spb.Status{} if err := proto.Unmarshal(v, pbStatus); err != nil { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return + err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + return } statusGen = status.FromProto(pbStatus) } else { From 44c840bf947fe9ed37f49bd4f0281c6dbb470c28 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 8 Mar 2020 22:52:06 -0400 Subject: [PATCH 07/18] goimports --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 2544369ee90..79cfee1e59b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -21,7 +21,6 @@ package transport import ( "context" "fmt" - spb "google.golang.org/genproto/googleapis/rpc/status" "io" "math" "net" @@ -34,6 +33,7 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" From bd4f7d27f5d93859eeb625901c350cff01c4b62f Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 5 Apr 2020 21:49:16 -0400 Subject: [PATCH 08/18] Update to reduce code in loop --- internal/transport/http2_client.go | 118 ++++++++++++----------------- 1 file changed, 48 insertions(+), 70 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 12b844575a6..916f938359c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1192,67 +1192,46 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - var ( - // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - isGRPC = !initialHeader - mdata = make(map[string][]string) - ) + // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. + var mdata = make(map[string][]string) for _, hf := range frame.Fields { - switch hf.Name { - case "content-type": - // TODO: do we want to propagate the whole content-type in the metadata, - // or come up with a way to just propagate the content-subtype if it was set? - // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} - // in the metadata? - if _, validContentType := contentSubtype(hf.Value); validContentType { - isGRPC = true - } - mdata[hf.Name] = append(mdata[hf.Name], hf.Value) - case "grpc-encoding": - if !endStream { - s.recvCompress = hf.Value - } - case "grpc-status": - mdata["grpc-status"] = []string{hf.Value} - case "grpc-message": - mdata["grpc-message"] = []string{decodeGrpcMessage(hf.Value)} - case "grpc-status-details-bin": - mdata[hf.Name] = append(mdata[hf.Name], hf.Value) - case ":status": - mdata[":status"] = []string{hf.Value} - case "grpc-tags-bin": - v, err := decodeBinHeader(hf.Value) - if err != nil { - if isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } - mdata[hf.Name] = append(mdata[hf.Name], string(v)) - case "grpc-trace-bin": - v, err := decodeBinHeader(hf.Value) - if err != nil { - if isGRPC { - err = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } - break - } - mdata[hf.Name] = append(mdata[hf.Name], string(v)) - default: - if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { - break - } - v, err := decodeMetadataHeader(hf.Name, hf.Value) - if err != nil { - errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) - break - } - mdata[hf.Name] = append(mdata[hf.Name], v) + v, err := decodeMetadataHeader(hf.Name, hf.Value) + if err != nil { + errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + mdata[hf.Name] = []string{} + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) + } + + var isGRPC = !initialHeader + // TODO: do we want to propagate the whole content-type in the metadata, + // or come up with a way to just propagate the content-subtype if it was set? + // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} + // in the metadata? + if contentTypes := mdata["content-type"]; len(contentTypes) == 1 { + if _, ok := contentSubtype(contentTypes[0]); ok { + isGRPC = true + } + } + if grpcEncoding := mdata["grpc-encoding"]; len(grpcEncoding) == 1 && !endStream { + s.recvCompress = grpcEncoding[0] + } + if grpcMessages := mdata["grpc-message"]; len(grpcMessages) == 1 { + mdata["grpc-message"] = []string{decodeGrpcMessage(grpcMessages[0])} + } + + if isGRPC { + if tagsBin, ok := mdata["grpc-tags-bin"]; ok && len(tagsBin) == 0 { + statusErr := status.Error(codes.Internal, "transport: malformed grpc-tags-bin") + t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) + return + } + if traceBin, ok := mdata["grpc-trace-bin"]; ok && len(traceBin) == 0 { + statusErr := status.Error(codes.Internal, "transport: malformed grpc-trace-bin") + t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) + return } } @@ -1335,25 +1314,22 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - // if client received END_STREAM from server while stream was still active, send RST_STREAM - rst := s.getState() == streamActive - var statusGen *status.Status - if sg := mdata["grpc-status-details-bin"]; len(sg) == 1 { - v, err := decodeBinHeader(sg[0]) - if err != nil { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + if sg, ok := mdata["grpc-status-details-bin"]; ok { + if len(sg) == 0 { + statusErr := status.Error(codes.Internal, "transport: malformed grpc-status-details-bin") + t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) return } pbStatus := &spb.Status{} - if err := proto.Unmarshal(v, pbStatus); err != nil { + if err := proto.Unmarshal([]byte(sg[0]), pbStatus); err != nil { err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } statusGen = status.FromProto(pbStatus) - } else { + } + if statusGen == nil { var rawStatusMsg string if rsm := mdata["grpc-message"]; len(rsm) == 1 { rawStatusMsg = rsm[0] @@ -1376,9 +1352,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } statusGen = status.New(codes.Code(rawStatusCode), rawStatusMsg) } - delete(mdata, "grpc-message") delete(mdata, "grpc-status") + + // if client received END_STREAM from server while stream was still active, send RST_STREAM + rst := s.getState() == streamActive t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) } From bc568f69240b589c8430a7ff3f9f381737ddfd33 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sun, 5 Apr 2020 22:07:08 -0400 Subject: [PATCH 09/18] create statuses, instead of converting them --- internal/transport/http2_client.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 916f938359c..5f3b04981fd 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1187,8 +1187,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. if frame.Truncated { - err := status.Error(codes.Internal, "peer header list size exceeded limit") - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + se := status.New(codes.Internal, "peer header list size exceeded limit") + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } @@ -1224,13 +1224,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if isGRPC { if tagsBin, ok := mdata["grpc-tags-bin"]; ok && len(tagsBin) == 0 { - statusErr := status.Error(codes.Internal, "transport: malformed grpc-tags-bin") - t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) + se := status.New(codes.Internal, "transport: malformed grpc-tags-bin") + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } if traceBin, ok := mdata["grpc-trace-bin"]; ok && len(traceBin) == 0 { - statusErr := status.Error(codes.Internal, "transport: malformed grpc-trace-bin") - t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) + se := status.New(codes.Internal, "transport: malformed grpc-trace-bin") + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } } @@ -1240,8 +1240,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if httpStatuses := mdata[":status"]; len(httpStatuses) == 1 { httpStatusCode, err := strconv.Atoi(httpStatuses[0]) if err != nil { - err = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + se := status.Newf(codes.Internal, "transport: malformed http-status: %v", err) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } httpStatus = &httpStatusCode @@ -1317,14 +1317,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { var statusGen *status.Status if sg, ok := mdata["grpc-status-details-bin"]; ok { if len(sg) == 0 { - statusErr := status.Error(codes.Internal, "transport: malformed grpc-status-details-bin") - t.closeStream(s, statusErr, true, http2.ErrCodeProtocol, status.Convert(statusErr), nil, endStream) + se := status.New(codes.Internal, "transport: malformed grpc-status-details-bin") + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } pbStatus := &spb.Status{} if err := proto.Unmarshal([]byte(sg[0]), pbStatus); err != nil { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + se := status.Newf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } statusGen = status.FromProto(pbStatus) @@ -1344,8 +1344,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if rsc := mdata["grpc-status"]; len(rsc) == 1 { code, err := strconv.Atoi(rsc[0]) if err != nil { - err = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) + se := status.Newf(codes.Internal, "transport: malformed grpc-status: %v", err) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } rawStatusCode = int32(code) From e8ca122a1f0b8b661fc439de33692d3075e71a99 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Wed, 16 Sep 2020 16:07:38 -0400 Subject: [PATCH 10/18] Integrate component logging change from #3617 --- internal/transport/http2_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index fddd8313194..3b78735fa9f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1218,7 +1218,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { for _, hf := range frame.Fields { v, err := decodeMetadataHeader(hf.Name, hf.Value) if err != nil { - errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + if logger.V(logLevel) { + logger.Errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + } mdata[hf.Name] = []string{} break } From ce409bf40ab1ee34e34a713e63a51504ecd728f1 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Wed, 16 Sep 2020 16:27:03 -0400 Subject: [PATCH 11/18] remove unused method --- internal/transport/http_util.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 44b2559b012..6b551354730 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -185,14 +185,6 @@ func isWhitelistedHeader(hdr string) bool { } } -func (d *decodeState) status() *status.Status { - if d.data.statusGen == nil { - // No status-details were provided; generate status using code/msg. - d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg) - } - return d.data.statusGen -} - const binHdrSuffix = "-bin" func encodeBinHeader(v []byte) string { From 268cd71278a495c7cc3d78e9f7a739621f6b46b8 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Tue, 4 May 2021 21:59:09 -0400 Subject: [PATCH 12/18] less map lookups --- internal/transport/http2_client.go | 200 +++++++++++++++-------------- internal/transport/http_util.go | 16 +-- 2 files changed, 104 insertions(+), 112 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 5923eec6d55..3fbfc6473ef 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -30,8 +30,10 @@ import ( "sync/atomic" "time" + "github.com/golang/protobuf/proto" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/channelz" @@ -1254,77 +1256,124 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - //state := &decodeState{} - //// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - //state.data.isGRPC = !initialHeader - //if h2code, err := state.decodeHeader(frame); err != nil { - // t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream) - // return // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. if frame.Truncated { se := status.New(codes.Internal, "peer header list size exceeded limit") - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream) return } // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. - var mdata = make(map[string][]string) + var ( + contentTypeErr string + grpcErr error + httpErr error - for _, hf := range frame.Fields { - v, err := decodeMetadataHeader(hf.Name, hf.Value) - if err != nil { - if logger.V(logLevel) { - logger.Errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) - } - mdata[hf.Name] = []string{} - break - } - mdata[hf.Name] = append(mdata[hf.Name], v) - } + grpcMessage string + rawStatusCode *int + statusGen *status.Status + + httpStatus *int + mdata = make(map[string][]string) + ) var isGRPC = !initialHeader - // TODO: do we want to propagate the whole content-type in the metadata, - // or come up with a way to just propagate the content-subtype if it was set? - // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} - // in the metadata? - if contentTypes := mdata["content-type"]; len(contentTypes) == 1 { - if _, ok := grpcutil.ContentSubtype(contentTypes[0]); ok { + for _, hf := range frame.Fields { + switch hf.Name { + case "content-type": + if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType { + contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) + break + } + // TODO: do we want to propagate the whole content-type in the metadata, + // or come up with a way to just propagate the content-subtype if it was set? + // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} + // in the metadata? + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) isGRPC = true + case "grpc-encoding": + s.recvCompress = hf.Value + case "grpc-status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + break + } + rawStatusCode = &code + case "grpc-message": + grpcMessage = decodeGrpcMessage(hf.Value) + case "grpc-status-details-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + st := &spb.Status{} + if err := proto.Unmarshal(v, st); err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + break + } + statusGen = status.FromProto(st) + case ":status": + code, err := strconv.Atoi(hf.Value) + if err != nil { + httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + break + } + httpStatus = &code + case "grpc-tags-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + case "grpc-trace-bin": + v, err := decodeBinHeader(hf.Value) + if err != nil { + grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], string(v)) + default: + if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { + break + } + v, err := decodeMetadataHeader(hf.Name, hf.Value) + if err != nil { + if logger.V(logLevel) { + logger.Errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + } + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) } } - if grpcEncoding := mdata["grpc-encoding"]; len(grpcEncoding) == 1 && !endStream { - s.recvCompress = grpcEncoding[0] - } - if grpcMessages := mdata["grpc-message"]; len(grpcMessages) == 1 { - mdata["grpc-message"] = []string{decodeGrpcMessage(grpcMessages[0])} - } if isGRPC { - if tagsBin, ok := mdata["grpc-tags-bin"]; ok && len(tagsBin) == 0 { - se := status.New(codes.Internal, "transport: malformed grpc-tags-bin") - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + if grpcErr != nil { + t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream) return } - if traceBin, ok := mdata["grpc-trace-bin"]; ok && len(traceBin) == 0 { - se := status.New(codes.Internal, "transport: malformed grpc-trace-bin") - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return + // gRPC status doesn't exist. + // Set rawStatusCode to be unknown and return nil error. + // So that, if the stream has ended this Unknown status + // will be propagated to the user. + // Otherwise, it will be ignored. In which case, status from + // a later trailer, that has StreamEnded flag set, is propagated. + if rawStatusCode == nil && statusGen == nil { + code := int(codes.Unknown) + rawStatusCode = &code } } - if !isGRPC { - var httpStatus *int - if httpStatuses := mdata[":status"]; len(httpStatuses) == 1 { - httpStatusCode, err := strconv.Atoi(httpStatuses[0]) - if err != nil { - se := status.Newf(codes.Internal, "transport: malformed http-status: %v", err) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return - } - httpStatus = &httpStatusCode - } + if httpErr != nil { + t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream) + return + } + if !isGRPC{ var code = codes.Internal // when header does not include HTTP status, return INTERNAL if httpStatus != nil { var ok bool @@ -1333,19 +1382,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { code = codes.Unknown } } - - contentType := "missing" - if ct := mdata["content-type"]; len(ct) == 1 { - contentType = ct[0] - } - contentTypeErr := fmt.Sprintf("transport: received the unexpected content-type %q", contentType) - msg := constructHTTPErrMsg(httpStatus, contentTypeErr) - if err := status.Error(code, msg); err != nil { + if err := status.Error(code, constructHTTPErrMsg( + httpStatus, + contentTypeErr, + )); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } } - delete(mdata, ":status") isHeader := false defer func() { @@ -1378,7 +1422,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed // headerChan which we'll close after setting this. - s.recvCompress = state.data.encoding if len(mdata) > 0 { s.header = mdata } @@ -1393,46 +1436,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - var statusGen *status.Status - if sg, ok := mdata["grpc-status-details-bin"]; ok { - if len(sg) == 0 { - se := status.New(codes.Internal, "transport: malformed grpc-status-details-bin") - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return - } - pbStatus := &spb.Status{} - if err := proto.Unmarshal([]byte(sg[0]), pbStatus); err != nil { - se := status.Newf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return - } - statusGen = status.FromProto(pbStatus) - } if statusGen == nil { - var rawStatusMsg string - if rsm := mdata["grpc-message"]; len(rsm) == 1 { - rawStatusMsg = rsm[0] - } - // if gRPC status doesn't exist, Set rawStatusCode to be - // unknown and return nil error. - // So that, if the stream has ended this Unknown status - // will be propagated to the user. - // Otherwise, it will be ignored. In which case, status from - // a later trailer, that has StreamEnded flag set, is propagated. - rawStatusCode := int32(codes.Unknown) - if rsc := mdata["grpc-status"]; len(rsc) == 1 { - code, err := strconv.Atoi(rsc[0]) - if err != nil { - se := status.Newf(codes.Internal, "transport: malformed grpc-status: %v", err) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return - } - rawStatusCode = int32(code) - } - statusGen = status.New(codes.Code(rawStatusCode), rawStatusMsg) + statusGen = status.New(codes.Code(int32(*(rawStatusCode))), grpcMessage) } - delete(mdata, "grpc-message") - delete(mdata, "grpc-status") // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index a972870ab1b..efa50b3482f 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -263,21 +263,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode // constructErrMsg constructs error message to be returned in HTTP fallback mode. // Format: HTTP status code and its corresponding message + content-type error message. func (d *decodeState) constructHTTPErrMsg() string { - var errMsgs []string - - if d.data.httpStatus == nil { - errMsgs = append(errMsgs, "malformed header: missing HTTP status") - } else { - errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus)) - } - - if d.data.contentTypeErr == "" { - errMsgs = append(errMsgs, "transport: missing content-type field") - } else { - errMsgs = append(errMsgs, d.data.contentTypeErr) - } - - return strings.Join(errMsgs, "; ") + return constructHTTPErrMsg(d.data.httpStatus, d.data.contentTypeErr) } // constructErrMsg constructs error message to be returned in HTTP fallback mode. From eac772e7fbbfdebcef360aefcfd500f269a2af28 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Tue, 4 May 2021 23:57:53 -0400 Subject: [PATCH 13/18] go vet & create statuses instead of strings --- internal/transport/http2_client.go | 53 +++++++++++++----------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3fbfc6473ef..3c94c69d7fb 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1264,21 +1264,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. var ( + // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. + isGRPC = !initialHeader + mdata = make(map[string][]string) contentTypeErr string - grpcErr error - httpErr error - grpcMessage string - rawStatusCode *int + grpcStatus *status.Status statusGen *status.Status - - httpStatus *int - mdata = make(map[string][]string) + rawStatusCode *int + httpStatus *int ) - - var isGRPC = !initialHeader for _, hf := range frame.Fields { switch hf.Name { case "content-type": @@ -1297,7 +1293,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { case "grpc-status": code, err := strconv.Atoi(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err)) break } rawStatusCode = &code @@ -1306,33 +1302,33 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { case "grpc-status-details-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)) break } st := &spb.Status{} if err := proto.Unmarshal(v, st); err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)) break } statusGen = status.FromProto(st) case ":status": code, err := strconv.Atoi(hf.Value) if err != nil { - httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) break } httpStatus = &code case "grpc-tags-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-tags-bin: %v", err)) break } mdata[hf.Name] = append(mdata[hf.Name], string(v)) case "grpc-trace-bin": v, err := decodeBinHeader(hf.Value) if err != nil { - grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-trace-bin: %v", err)) break } mdata[hf.Name] = append(mdata[hf.Name], string(v)) @@ -1349,31 +1345,26 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } mdata[hf.Name] = append(mdata[hf.Name], v) } - } - if isGRPC { - if grpcErr != nil { - t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream) + // Account for the fact that we can set isGRPC at any time - it's possible + // that we'll set isGRPC AFTER an error has occurred. + if isGRPC && grpcStatus != nil { + t.closeStream(s, grpcStatus.Err(), true, http2.ErrCodeProtocol, grpcStatus, nil, endStream) return } + } + + if isGRPC { // gRPC status doesn't exist. - // Set rawStatusCode to be unknown and return nil error. - // So that, if the stream has ended this Unknown status - // will be propagated to the user. + // Set rawStatusCode to be unknown so that, if the stream has ended this + // Unknown status will be propagated to the user. // Otherwise, it will be ignored. In which case, status from // a later trailer, that has StreamEnded flag set, is propagated. if rawStatusCode == nil && statusGen == nil { code := int(codes.Unknown) rawStatusCode = &code } - } - - if httpErr != nil { - t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream) - return - } - - if !isGRPC{ + } else { var code = codes.Internal // when header does not include HTTP status, return INTERNAL if httpStatus != nil { var ok bool From b4e27f6f4dd2d64e9716e82d11030d73b8ee7803 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Wed, 5 May 2021 20:12:30 -0400 Subject: [PATCH 14/18] Address PR comments --- internal/transport/http2_client.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3c94c69d7fb..1ee3f4f3b9f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1272,7 +1272,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { grpcMessage string grpcStatus *status.Status statusGen *status.Status - rawStatusCode *int + rawStatusCode = codes.Unknown httpStatus *int ) for _, hf := range frame.Fields { @@ -1296,7 +1296,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err)) break } - rawStatusCode = &code + rawStatusCode = codes.Code(uint32(code)) case "grpc-message": grpcMessage = decodeGrpcMessage(hf.Value) case "grpc-status-details-bin": @@ -1354,32 +1354,18 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } } - if isGRPC { - // gRPC status doesn't exist. - // Set rawStatusCode to be unknown so that, if the stream has ended this - // Unknown status will be propagated to the user. - // Otherwise, it will be ignored. In which case, status from - // a later trailer, that has StreamEnded flag set, is propagated. - if rawStatusCode == nil && statusGen == nil { - code := int(codes.Unknown) - rawStatusCode = &code - } - } else { + if !isGRPC { var code = codes.Internal // when header does not include HTTP status, return INTERNAL if httpStatus != nil { var ok bool - code, ok = HTTPStatusConvTab[*(httpStatus)] + code, ok = HTTPStatusConvTab[*httpStatus] if !ok { code = codes.Unknown } } - if err := status.Error(code, constructHTTPErrMsg( - httpStatus, - contentTypeErr, - )); err != nil { - t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) - return - } + se := status.New(code, constructHTTPErrMsg(httpStatus, contentTypeErr)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return } isHeader := false @@ -1428,7 +1414,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } if statusGen == nil { - statusGen = status.New(codes.Code(int32(*(rawStatusCode))), grpcMessage) + statusGen = status.New(rawStatusCode, grpcMessage) } // if client received END_STREAM from server while stream was still active, send RST_STREAM From 1ddc6ef4b1a65a95b5a6b33603588b789f26def1 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Wed, 5 May 2021 20:47:01 -0400 Subject: [PATCH 15/18] only process http status code when not in grpc mode --- internal/transport/http2_client.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 1ee3f4f3b9f..5d9ea1570de 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1273,8 +1273,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { grpcStatus *status.Status statusGen *status.Status rawStatusCode = codes.Unknown - httpStatus *int ) + for _, hf := range frame.Fields { switch hf.Name { case "content-type": @@ -1312,12 +1312,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } statusGen = status.FromProto(st) case ":status": - code, err := strconv.Atoi(hf.Value) - if err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) - break - } - httpStatus = &code + // we only process :status if this is not grpc so no need to ALWAYS convert the value + mdata[hf.Name] = append(mdata[hf.Name], hf.Value) case "grpc-tags-bin": v, err := decodeBinHeader(hf.Value) if err != nil { @@ -1355,19 +1351,27 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } if !isGRPC { - var code = codes.Internal // when header does not include HTTP status, return INTERNAL - if httpStatus != nil { - var ok bool - code, ok = HTTPStatusConvTab[*httpStatus] + c := codes.Internal // when header does not include HTTP status, return INTERNAL + if httpStatuses, ok := mdata[":status"]; ok && len(httpStatuses) > 0 { + httpStatus, err := strconv.Atoi(httpStatuses[0]) + if err != nil { + se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + c, ok = HTTPStatusConvTab[httpStatus] if !ok { - code = codes.Unknown + c = codes.Unknown } } - se := status.New(code, constructHTTPErrMsg(httpStatus, contentTypeErr)) + sc := int(c) + se := status.New(c, constructHTTPErrMsg(&sc, contentTypeErr)) t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } + delete(mdata, ":status") + isHeader := false defer func() { if t.statsHandler != nil { From 3e4245e63bb7284b493208bb52b311b7d73c71b4 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Sat, 8 May 2021 21:59:37 -0400 Subject: [PATCH 16/18] address code review comments --- internal/transport/http2_client.go | 96 +++++++++++++----------------- internal/transport/http_util.go | 12 ++++ 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 5d9ea1570de..1fee8e39dd8 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -30,10 +30,8 @@ import ( "sync/atomic" "time" - "github.com/golang/protobuf/proto" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" - spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/channelz" @@ -1265,14 +1263,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } var ( - // If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. + // If a gRPC Response-Headers has already been received, then it means + // that the peer is speaking gRPC and we are in gRPC mode. isGRPC = !initialHeader mdata = make(map[string][]string) contentTypeErr string grpcMessage string - grpcStatus *status.Status statusGen *status.Status - rawStatusCode = codes.Unknown + + grpcStatus string + httpStatus string + rawStatus string ) for _, hf := range frame.Fields { @@ -1282,61 +1283,31 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value) break } - // TODO: do we want to propagate the whole content-type in the metadata, - // or come up with a way to just propagate the content-subtype if it was set? - // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"} - // in the metadata? mdata[hf.Name] = append(mdata[hf.Name], hf.Value) isGRPC = true case "grpc-encoding": s.recvCompress = hf.Value case "grpc-status": - code, err := strconv.Atoi(hf.Value) - if err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err)) - break - } - rawStatusCode = codes.Code(uint32(code)) + rawStatus = hf.Value case "grpc-message": grpcMessage = decodeGrpcMessage(hf.Value) case "grpc-status-details-bin": - v, err := decodeBinHeader(hf.Value) + sg, err := decodeGRPCStatusDetails(hf.Value) if err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)) - break - } - st := &spb.Status{} - if err := proto.Unmarshal(v, st); err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)) + grpcStatus = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err) break } - statusGen = status.FromProto(st) + statusGen = sg case ":status": - // we only process :status if this is not grpc so no need to ALWAYS convert the value - mdata[hf.Name] = append(mdata[hf.Name], hf.Value) - case "grpc-tags-bin": - v, err := decodeBinHeader(hf.Value) - if err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-tags-bin: %v", err)) - break - } - mdata[hf.Name] = append(mdata[hf.Name], string(v)) - case "grpc-trace-bin": - v, err := decodeBinHeader(hf.Value) - if err != nil { - grpcStatus = status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-trace-bin: %v", err)) - break - } - mdata[hf.Name] = append(mdata[hf.Name], string(v)) + httpStatus = hf.Value default: if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { break } v, err := decodeMetadataHeader(hf.Name, hf.Value) if err != nil { - if logger.V(logLevel) { - logger.Errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) - } + grpcStatus = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) + logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) break } mdata[hf.Name] = append(mdata[hf.Name], v) @@ -1344,34 +1315,41 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { // Account for the fact that we can set isGRPC at any time - it's possible // that we'll set isGRPC AFTER an error has occurred. - if isGRPC && grpcStatus != nil { - t.closeStream(s, grpcStatus.Err(), true, http2.ErrCodeProtocol, grpcStatus, nil, endStream) + if isGRPC && grpcStatus != "" { + se := status.New(codes.Internal, grpcStatus) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } } if !isGRPC { - c := codes.Internal // when header does not include HTTP status, return INTERNAL - if httpStatuses, ok := mdata[":status"]; ok && len(httpStatuses) > 0 { - httpStatus, err := strconv.Atoi(httpStatuses[0]) + var ( + code = codes.Internal // when header does not include HTTP status, return INTERNAL + httpStatusCode int + ) + + if httpStatus != "" { + c, err := strconv.ParseInt(httpStatus, 10, 32) if err != nil { se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } - c, ok = HTTPStatusConvTab[httpStatus] + httpStatusCode = int(c) + + var ok bool + code, ok = HTTPStatusConvTab[httpStatusCode] if !ok { - c = codes.Unknown + code = codes.Unknown } } - sc := int(c) - se := status.New(c, constructHTTPErrMsg(&sc, contentTypeErr)) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + + // Verify the HTTP response is a 200. + se := status.Error(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr)) + t.closeStream(s, se, true, http2.ErrCodeProtocol, status.Convert(se), nil, endStream) return } - delete(mdata, ":status") - isHeader := false defer func() { if t.statsHandler != nil { @@ -1418,6 +1396,16 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } if statusGen == nil { + rawStatusCode := codes.Unknown + if rawStatus != "" { + code, err := strconv.ParseInt(rawStatus, 10, 32) + if err != nil { + se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + rawStatusCode = codes.Code(uint32(code)) + } statusGen = status.New(rawStatusCode, grpcMessage) } diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index efa50b3482f..2771e224f7b 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -209,6 +209,18 @@ func decodeMetadataHeader(k, v string) (string, error) { return v, nil } +func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) { + v, err := decodeBinHeader(rawDetails) + if err != nil { + return nil, err + } + st := &spb.Status{} + if err = proto.Unmarshal(v, st); err != nil { + return nil, err + } + return status.FromProto(st), nil +} + func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) { // frame.Truncated is set to true when framer detects that the current header // list size hits MaxHeaderListSize limit. From fef76c28c3e38a3318b230d81b2450b2e73a32c9 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Tue, 11 May 2021 16:20:20 -0400 Subject: [PATCH 17/18] Address PR comments --- internal/transport/http2_client.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 1fee8e39dd8..4380aae1d8f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1271,9 +1271,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { grpcMessage string statusGen *status.Status - grpcStatus string httpStatus string rawStatus string + // headerError is set if an error is encountered while parsing the headers + headerError string ) for _, hf := range frame.Fields { @@ -1292,12 +1293,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { case "grpc-message": grpcMessage = decodeGrpcMessage(hf.Value) case "grpc-status-details-bin": - sg, err := decodeGRPCStatusDetails(hf.Value) + var err error + statusGen, err = decodeGRPCStatusDetails(hf.Value) if err != nil { - grpcStatus = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err) - break + headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err) } - statusGen = sg case ":status": httpStatus = hf.Value default: @@ -1306,20 +1306,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } v, err := decodeMetadataHeader(hf.Name, hf.Value) if err != nil { - grpcStatus = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) + headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) break } mdata[hf.Name] = append(mdata[hf.Name], v) } - - // Account for the fact that we can set isGRPC at any time - it's possible - // that we'll set isGRPC AFTER an error has occurred. - if isGRPC && grpcStatus != "" { - se := status.New(codes.Internal, grpcStatus) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) - return - } } if !isGRPC { @@ -1350,6 +1342,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } + if headerError != "" { + se := status.New(codes.Internal, headerError) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + return + } + isHeader := false defer func() { if t.statsHandler != nil { From c1095bd8fee3e4ac5e57f3f8b861b5f0085cc991 Mon Sep 17 00:00:00 2001 From: James Protzman Date: Tue, 11 May 2021 20:03:32 -0400 Subject: [PATCH 18/18] Don't use status.convert() --- internal/transport/http2_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 4380aae1d8f..64ebd4a167f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1337,8 +1337,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } // Verify the HTTP response is a 200. - se := status.Error(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr)) - t.closeStream(s, se, true, http2.ErrCodeProtocol, status.Convert(se), nil, endStream) + se := status.New(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr)) + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return }