Skip to content

Commit

Permalink
stats: add stream info inside stats.Begin (#4533)
Browse files Browse the repository at this point in the history
  • Loading branch information
leviska committed Jun 18, 2021
1 parent 74fe073 commit 4faa31f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 27 deletions.
8 changes: 6 additions & 2 deletions server.go
Expand Up @@ -1144,7 +1144,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if sh != nil {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
BeginTime: beginTime,
IsClientStream: false,
IsServerStream: false,
}
sh.HandleRPC(stream.Context(), statsBegin)
}
Expand Down Expand Up @@ -1424,7 +1426,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if sh != nil {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
BeginTime: beginTime,
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
sh.HandleRPC(stream.Context(), statsBegin)
}
Expand Down
4 changes: 4 additions & 0 deletions stats/stats.go
Expand Up @@ -45,6 +45,10 @@ type Begin struct {
BeginTime time.Time
// FailFast indicates if this RPC is failfast.
FailFast bool
// IsClientStream indicates whether the RPC is a client streaming RPC.
IsClientStream bool
// IsServerStream indicates whether the RPC is a server streaming RPC.
IsServerStream bool
}

// IsClient indicates if the stats information is from client side.
Expand Down
70 changes: 48 additions & 22 deletions stats/stats_test.go
Expand Up @@ -407,15 +407,17 @@ func (te *test) doServerStreamCall(c *rpcConfig) (*testpb.StreamingOutputCallReq
}

type expectedData struct {
method string
serverAddr string
compression string
reqIdx int
requests []proto.Message
respIdx int
responses []proto.Message
err error
failfast bool
method string
isClientStream bool
isServerStream bool
serverAddr string
compression string
reqIdx int
requests []proto.Message
respIdx int
responses []proto.Message
err error
failfast bool
}

type gotData struct {
Expand Down Expand Up @@ -456,6 +458,12 @@ func checkBegin(t *testing.T, d *gotData, e *expectedData) {
t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
}
}
if st.IsClientStream != e.isClientStream {
t.Fatalf("st.IsClientStream = %v, want %v", st.IsClientStream, e.isClientStream)
}
if st.IsServerStream != e.isServerStream {
t.Fatalf("st.IsServerStream = %v, want %v", st.IsServerStream, e.isServerStream)
}
}

func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
Expand Down Expand Up @@ -847,6 +855,9 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f
err error
method string

isClientStream bool
isServerStream bool

req proto.Message
resp proto.Message
e error
Expand All @@ -864,14 +875,18 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f
reqs, resp, e = te.doClientStreamCall(cc)
resps = []proto.Message{resp}
err = e
isClientStream = true
case serverStreamRPC:
method = "/grpc.testing.TestService/StreamingOutputCall"
req, resps, e = te.doServerStreamCall(cc)
reqs = []proto.Message{req}
err = e
isServerStream = true
case fullDuplexStreamRPC:
method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
isClientStream = true
isServerStream = true
}
if cc.success != (err == nil) {
t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
Expand Down Expand Up @@ -900,12 +915,14 @@ func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []f
}

expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
method: method,
requests: reqs,
responses: resps,
err: err,
serverAddr: te.srvAddr,
compression: tc.compress,
method: method,
requests: reqs,
responses: resps,
err: err,
isClientStream: isClientStream,
isServerStream: isServerStream,
}

h.mu.Lock()
Expand Down Expand Up @@ -1138,6 +1155,9 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map
method string
err error

isClientStream bool
isServerStream bool

req proto.Message
resp proto.Message
e error
Expand All @@ -1154,14 +1174,18 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map
reqs, resp, e = te.doClientStreamCall(cc)
resps = []proto.Message{resp}
err = e
isClientStream = true
case serverStreamRPC:
method = "/grpc.testing.TestService/StreamingOutputCall"
req, resps, e = te.doServerStreamCall(cc)
reqs = []proto.Message{req}
err = e
isServerStream = true
case fullDuplexStreamRPC:
method = "/grpc.testing.TestService/FullDuplexCall"
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
isClientStream = true
isServerStream = true
}
if cc.success != (err == nil) {
t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
Expand Down Expand Up @@ -1194,13 +1218,15 @@ func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map
}

expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
method: method,
requests: reqs,
responses: resps,
failfast: cc.failfast,
err: err,
serverAddr: te.srvAddr,
compression: tc.compress,
method: method,
requests: reqs,
responses: resps,
failfast: cc.failfast,
err: err,
isClientStream: isClientStream,
isServerStream: isServerStream,
}

h.mu.Lock()
Expand Down
8 changes: 5 additions & 3 deletions stream.go
Expand Up @@ -295,9 +295,11 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: c.failFast,
Client: true,
BeginTime: beginTime,
FailFast: c.failFast,
IsClientStream: desc.ClientStreams,
IsServerStream: desc.ServerStreams,
}
sh.HandleRPC(ctx, begin)
}
Expand Down

0 comments on commit 4faa31f

Please sign in to comment.