Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: remove decodeState from client to reduce allocations #3313

Merged
merged 34 commits into from May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a4a5f28
transport: remove decodeState from http_client
JNProtzman Jan 12, 2020
cd907d6
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman Jan 26, 2020
3f62a6e
Remove grpcErr & httpErr
JNProtzman Jan 27, 2020
88d6557
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Feb 15, 2020
0804bd7
wip
JNProtzman Feb 15, 2020
7850d79
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Mar 8, 2020
650d182
Remove several local variables
JNProtzman Mar 8, 2020
e9bbc9b
Cleanup redundant code
JNProtzman Mar 8, 2020
00df58b
go fmt
JNProtzman Mar 8, 2020
44c840b
goimports
JNProtzman Mar 9, 2020
5b3ab1c
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Mar 30, 2020
515d41c
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 6, 2020
bd4f7d2
Update to reduce code in loop
JNProtzman Apr 6, 2020
bc568f6
create statuses, instead of converting them
JNProtzman Apr 6, 2020
ebc4dc1
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 15, 2020
c1fb093
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Apr 30, 2020
ae83159
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 2, 2020
32a2ae1
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 30, 2020
80a9c2e
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman Jul 2, 2020
e8ca122
Integrate component logging change from #3617
JNProtzman Sep 16, 2020
edba44f
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman Sep 16, 2020
ce409bf
remove unused method
JNProtzman Sep 16, 2020
eeeeaed
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 5, 2021
268cd71
less map lookups
JNProtzman May 5, 2021
eac772e
go vet & create statuses instead of strings
JNProtzman May 5, 2021
b4e27f6
Address PR comments
JNProtzman May 6, 2021
1de0a18
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 6, 2021
1ddc6ef
only process http status code when not in grpc mode
JNProtzman May 6, 2021
4007ee2
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 9, 2021
3e4245e
address code review comments
JNProtzman May 9, 2021
3e0cc45
Merge branch 'master' of https://github.com/JNProtzman/grpc-go into d…
JNProtzman May 11, 2021
fef76c2
Address PR comments
JNProtzman May 11, 2021
c1095bd
Don't use status.convert()
JNProtzman May 12, 2021
ac7193f
Merge branch 'master' of https://github.com/grpc/grpc-go into decodeS…
JNProtzman May 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
147 changes: 139 additions & 8 deletions internal/transport/http2_client.go
Expand Up @@ -30,9 +30,11 @@ import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"

spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -1179,14 +1181,139 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

state := &decodeState{}
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
state.data.isGRPC = !initialHeader
if err := state.decodeHeader(frame); err != nil {
// frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit.
if frame.Truncated {
err := status.Error(codes.Internal, "peer header list size exceeded limit")
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
return
}

var (
// If a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = !initialHeader
mdata = make(map[string][]string)
statusGen *status.Status
grpcErr error
httpErr error
rawStatusCode *int
httpStatus *int
encoding string
contentTypeErr string
rawStatusMsg string
)
dfawley marked this conversation as resolved.
Show resolved Hide resolved

for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
dfawley marked this conversation as resolved.
Show resolved Hide resolved
_, validContentType := contentSubtype(hf.Value)
if !validContentType {
contentTypeErr = fmt.Sprintf("transport: received the unexpected content-type %q", hf.Value)
break
}
// TODO: do we want to propagate the whole content-type in the metadata,
// or come up with a way to just propagate the content-subtype if it was set?
// ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
// in the metadata?
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
isGRPC = true
case "grpc-encoding":
encoding = hf.Value
case "grpc-status":
code, err := strconv.Atoi(hf.Value)
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
break
}
rawStatusCode = &code
case "grpc-message":
rawStatusMsg = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
break
}
s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
break
}
statusGen = status.FromProto(s)
case ":status":
code, err := strconv.Atoi(hf.Value)
if err != nil {
httpErr = status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
break
}
httpStatus = &code
case "grpc-tags-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], string(v))
case "grpc-trace-bin":
v, err := decodeBinHeader(hf.Value)
if err != nil {
grpcErr = status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], string(v))
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
errorf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}

if isGRPC {
if grpcErr != nil {
t.closeStream(s, grpcErr, true, http2.ErrCodeProtocol, status.Convert(grpcErr), nil, endStream)
return
}
if rawStatusCode == nil && statusGen == nil {
// gRPC status doesn't exist.
// Set rawStatusCode to be unknown and return nil error.
// So that, if the stream has ended this Unknown status
// will be propagated to the user.
// Otherwise, it will be ignored. In which case, status from
// a later trailer, that has StreamEnded flag set, is propagated.
code := int(codes.Unknown)
rawStatusCode = &code
}
} else {
if httpErr != nil {
t.closeStream(s, httpErr, true, http2.ErrCodeProtocol, status.Convert(httpErr), nil, endStream)
return
}

var (
code = codes.Internal // when header does not include HTTP status, return INTERNAL
ok bool
)

if httpStatus != nil {
code, ok = HTTPStatusConvTab[*(httpStatus)]
JNProtzman marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
code = codes.Unknown
}
}

msg := constructHTTPErrMsg(httpStatus, contentTypeErr)
if err := status.Error(code, msg); err != nil {
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
return
}
}

isHeader := false
defer func() {
if t.statsHandler != nil {
Expand Down Expand Up @@ -1217,9 +1344,9 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = state.data.encoding
if len(state.data.mdata) > 0 {
s.header = state.data.mdata
s.recvCompress = encoding
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
Expand All @@ -1234,7 +1361,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {

// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)

if statusGen == nil {
statusGen = status.New(codes.Code(int32(*(rawStatusCode))), rawStatusMsg)
}
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
}

// reader runs as a separate goroutine in charge of reading data from network
Expand Down
28 changes: 20 additions & 8 deletions internal/transport/http_util.go
Expand Up @@ -222,14 +222,6 @@ func contentType(contentSubtype string) string {
return baseContentType + "+" + contentSubtype
}

func (d *decodeState) status() *status.Status {
if d.data.statusGen == nil {
// No status-details were provided; generate status using code/msg.
d.data.statusGen = status.New(codes.Code(int32(*(d.data.rawStatusCode))), d.data.rawStatusMsg)
}
return d.data.statusGen
}

const binHdrSuffix = "-bin"

func encodeBinHeader(v []byte) string {
Expand Down Expand Up @@ -330,6 +322,26 @@ func (d *decodeState) constructHTTPErrMsg() string {
return strings.Join(errMsgs, "; ")
}

// constructErrMsg constructs error message to be returned in HTTP fallback mode.
// Format: HTTP status code and its corresponding message + content-type error message.
func constructHTTPErrMsg(httpStatus *int, contentTypeErr string) string {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
var errMsgs []string

if httpStatus == nil {
errMsgs = append(errMsgs, "malformed header: missing HTTP status")
} else {
errMsgs = append(errMsgs, fmt.Sprintf("%s: HTTP status code %d", http.StatusText(*(httpStatus)), *httpStatus))
}

if contentTypeErr == "" {
errMsgs = append(errMsgs, "transport: missing content-type field")
} else {
errMsgs = append(errMsgs, contentTypeErr)
}

return strings.Join(errMsgs, "; ")
}

func (d *decodeState) addMetadata(k, v string) {
if d.data.mdata == nil {
d.data.mdata = make(map[string][]string)
Expand Down