Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: remove decodeState from client to reduce allocations #3313

Merged
merged 34 commits into from May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a4a5f28
transport: remove decodeState from http_client
JNProtzman Jan 12, 2020
cd907d6
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman Jan 26, 2020
3f62a6e
Remove grpcErr & httpErr
JNProtzman Jan 27, 2020
88d6557
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Feb 15, 2020
0804bd7
wip
JNProtzman Feb 15, 2020
7850d79
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Mar 8, 2020
650d182
Remove several local variables
JNProtzman Mar 8, 2020
e9bbc9b
Cleanup redundant code
JNProtzman Mar 8, 2020
00df58b
go fmt
JNProtzman Mar 8, 2020
44c840b
goimports
JNProtzman Mar 9, 2020
5b3ab1c
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Mar 30, 2020
515d41c
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 6, 2020
bd4f7d2
Update to reduce code in loop
JNProtzman Apr 6, 2020
bc568f6
create statuses, instead of converting them
JNProtzman Apr 6, 2020
ebc4dc1
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 15, 2020
c1fb093
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 30, 2020
ae83159
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 2, 2020
32a2ae1
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 30, 2020
80a9c2e
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Jul 2, 2020
e8ca122
Integrate component logging change from #3617
JNProtzman Sep 16, 2020
edba44f
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman Sep 16, 2020
ce409bf
remove unused method
JNProtzman Sep 16, 2020
eeeeaed
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 5, 2021
268cd71
less map lookups
JNProtzman May 5, 2021
eac772e
go vet & create statuses instead of strings
JNProtzman May 5, 2021
b4e27f6
Address PR comments
JNProtzman May 6, 2021
1de0a18
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 6, 2021
1ddc6ef
only process http status code when not in grpc mode
JNProtzman May 6, 2021
4007ee2
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 9, 2021
3e4245e
address code review comments
JNProtzman May 9, 2021
3e0cc45
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 11, 2021
fef76c2
Address PR comments
JNProtzman May 11, 2021
c1095bd
Don't use status.convert()
JNProtzman May 12, 2021
ac7193f
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 110 additions & 9 deletions internal/transport/http2_client.go
Expand Up @@ -1254,11 +1254,99 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if h2code, err := state.decodeHeader(frame); err != nil {
t.closeStream(s, err, true, h2code, status.Convert(err), nil, endStream)
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
se := status.New(codes.Internal, "peer header list size exceeded limit")
t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
return
}

var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = !initialHeader
mdata = make(map[string][]string)
contentTypeErr string
grpcMessage string
statusGen *status.Status

grpcStatus string
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
httpStatus string
rawStatus string
)

for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
dfawley marked this conversation as resolved.
Show resolved Hide resolved
if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value)
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
isGRPC = true
case "grpc-encoding":
s.recvCompress = hf.Value
case "grpc-status":
rawStatus = hf.Value
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
sg, err := decodeGRPCStatusDetails(hf.Value)
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
grpcStatus = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
break
}
statusGen = sg
case ":status":
httpStatus = hf.Value
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
grpcStatus = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}

// Account for the fact that we can set isGRPC at any time - it's possible
// that we'll set isGRPC AFTER an error has occurred.
if isGRPC && grpcStatus != "" {
se := status.New(codes.Internal, grpcStatus)
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
}

if !isGRPC {
var (
code = codes.Internal // when header does not include HTTP status, return INTERNAL
httpStatusCode int
)

if httpStatus != "" {
c, err := strconv.ParseInt(httpStatus, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
httpStatusCode = int(c)

var ok bool
code, ok = HTTPStatusConvTab[httpStatusCode]
if !ok {
code = codes.Unknown
}
}

// Verify the HTTP response is a 200.
se := status.Error(code, constructHTTPErrMsg(&httpStatusCode, contentTypeErr))
t.closeStream(s, se, true, http2.ErrCodeProtocol, status.Convert(se), nil, endStream)
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down Expand Up @@ -1293,9 +1381,8 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = state.data.encoding
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
Expand All @@ -1308,9 +1395,23 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

if statusGen == nil {
rawStatusCode := codes.Unknown
if rawStatus != "" {
code, err := strconv.ParseInt(rawStatus, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
rawStatusCode = codes.Code(uint32(code))
}
statusGen = status.New(rawStatusCode, grpcMessage)
}

// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}

// reader runs as a separate goroutine in charge of reading data from network
Expand Down
34 changes: 22 additions & 12 deletions internal/transport/http_util.go
Expand Up @@ -180,14 +180,6 @@ func isWhitelistedHeader(hdr string) bool {
}
}

func (d *decodeState) status() *status.Status {
if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
return d.data.statusGen
}

const binHdrSuffix = "-bin"

func encodeBinHeader(v []byte) string {
Expand Down Expand Up @@ -217,6 +209,18 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}

func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
v, err := decodeBinHeader(rawDetails)
if err != nil {
return nil, err
}
st := &spb.Status{}
if err = proto.Unmarshal(v, st); err != nil {
return nil, err
}
return status.FromProto(st), nil
}

func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode, error) {
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
Expand Down Expand Up @@ -271,18 +275,24 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) (http2.ErrCode
// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func (d *decodeState) constructHTTPErrMsg() string {
return constructHTTPErrMsg(d.data.httpStatus, d.data.contentTypeErr)
}

// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
var errMsgs []string

if d.data.httpStatus == nil {
if httpStatus == nil {
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
} else {
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(d.data.httpStatus)), *d.data.httpStatus))
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus))
}

if d.data.contentTypeErr == "" {
if contentTypeErr == "" {
errMsgs = append(errMsgs, "transport: missing content-type field")
} else {
errMsgs = append(errMsgs, d.data.contentTypeErr)
errMsgs = append(errMsgs, contentTypeErr)
}

return strings.Join(errMsgs, "; ")
Expand Down