Skip to content

Commit

Permalink
transport: remove decodeState from client to reduce allocations (#3313)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNProtzman committed May 12, 2021
1 parent 62adda2 commit a95a5c3
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 21 deletions.
117 changes: 108 additions & 9 deletions internal/transport/http2_client.go
Expand Up @@ -1254,11 +1254,97 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if h2code, err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream)
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
se := status.New(codes.Internal, "peer header list size exceeded limit")
t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
return
}

var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = !initialHeader
mdata = make(map[string][]string)
contentTypeErr string
grpcMessage string
statusGen *status.Status

httpStatus string
rawStatus string
// headerError is set if an error is encountered while parsing the headers
headerError string
)

for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value)
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
isGRPC = true
case "grpc-encoding":
s.recvCompress = hf.Value
case "grpc-status":
rawStatus = hf.Value
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
var err error
statusGen, err = decodeGRPCStatusDetails(hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
}
case ":status":
httpStatus = hf.Value
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}

if !isGRPC {
var (
code = codes.Internal // when header does not include HTTP status, return INTERNAL
httpStatusCode int
)

if httpStatus != "" {
c, err := strconv.ParseInt(httpStatus, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
httpStatusCode = int(c)

var ok bool
code, ok = HTTPStatusConvTab[httpStatusCode]
if !ok {
code = codes.Unknown
}
}

// Verify the HTTP response is a 200.
se := status.New(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}

if headerError != "" {
se := status.New(codes.Internal, headerError)
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}

Expand Down Expand Up @@ -1293,9 +1379,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = state.data.encoding
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
Expand All @@ -1308,9 +1393,23 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

if statusGen == nil {
rawStatusCode := codes.Unknown
if rawStatus != "" {
code, err := strconv.ParseInt(rawStatus, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
rawStatusCode = codes.Code(uint32(code))
}
statusGen = status.New(rawStatusCode, grpcMessage)
}

// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}

// reader runs as a separate goroutine in charge of reading data from network
Expand Down
34 changes: 22 additions & 12 deletions internal/transport/http_util.go
Expand Up @@ -180,14 +180,6 @@ func isWhitelistedHeader(hdr string) bool {
}
}

func (d *decodeState) status() *status.Status {
if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
return d.data.statusGen
}

const binHdrSuffix = "-bin"

func encodeBinHeader(v []byte) string {
Expand Down Expand Up @@ -217,6 +209,18 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}

func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
v, err := decodeBinHeader(rawDetails)
if err != nil {
return nil, err
}
st := &spb.Status{}
if err = proto.Unmarshal(v, st); err != nil {
return nil, err
}
return status.FromProto(st), nil
}

func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) {
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
Expand Down Expand Up @@ -271,18 +275,24 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode
// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func (d *decodeState) constructHTTPErrMsg() string {
return constructHTTPErrMsg(d.data.httpStatus, d.data.contentTypeErr)
}

// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string {
var errMsgs []string

if d.data.httpStatus == nil {
if httpStatus == nil {
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
} else {
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus))
}

if d.data.contentTypeErr == "" {
if contentTypeErr == "" {
errMsgs = append(errMsgs, "transport: missing content-type field")
} else {
errMsgs = append(errMsgs, d.data.contentTypeErr)
errMsgs = append(errMsgs, contentTypeErr)
}

return strings.Join(errMsgs, "; ")
Expand Down

0 comments on commit a95a5c3

Please sign in to comment.