Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Add started RPC metric for client and server side #1283

Merged
merged 2 commits into from Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions plugin/ocgrpc/client_metrics.go
Expand Up @@ -28,6 +28,7 @@ var (
ClientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
ClientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes)
ClientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
ClientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "Number of started client RPCs.", stats.UnitDimensionless)
ClientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
)

Expand Down Expand Up @@ -70,6 +71,14 @@ var (
Aggregation: view.Count(),
}

ClientStartedRPCsView = &view.View{
Measure: ClientStartedRPCs,
Name: "grpc.io/client/started_rpcs",
Description: "Number of started client RPCs.",
TagKeys: []tag.Key{KeyClientMethod},
Aggregation: view.Count(),
}

ClientSentMessagesPerRPCView = &view.View{
Measure: ClientSentMessagesPerRPC,
Name: "grpc.io/client/sent_messages_per_rpc",
Expand Down
12 changes: 12 additions & 0 deletions plugin/ocgrpc/end_to_end_test.go
Expand Up @@ -40,6 +40,8 @@ func TestEndToEnd_Single(t *testing.T) {
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
ocgrpc.ServerStartedRPCsView,
ocgrpc.ClientStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
Expand All @@ -63,10 +65,14 @@ func TestEndToEnd_Single(t *testing.T) {
if err != nil {
t.Fatal(err)
}
checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)

_, _ = client.Single(ctx, &testpb.FooRequest{Fail: true})
checkCount(t, ocgrpc.ClientStartedRPCsView, 2, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 2, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, serverStatusUnknownTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, clientStatusUnknownTag)

Expand Down Expand Up @@ -101,6 +107,7 @@ func TestEndToEnd_Single(t *testing.T) {
func TestEndToEnd_Stream(t *testing.T) {
view.Register(ocgrpc.DefaultClientViews...)
defer view.Unregister(ocgrpc.DefaultClientViews...)

view.Register(ocgrpc.DefaultServerViews...)
defer view.Unregister(ocgrpc.DefaultServerViews...)

Expand All @@ -109,6 +116,8 @@ func TestEndToEnd_Stream(t *testing.T) {
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
ocgrpc.ClientStartedRPCsView,
ocgrpc.ServerStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
Expand Down Expand Up @@ -146,6 +155,8 @@ func TestEndToEnd_Stream(t *testing.T) {
t.Fatal(err)
}

checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)

Expand Down Expand Up @@ -183,6 +194,7 @@ func getCount(t *testing.T, v *view.View, tags ...tag.Tag) (int64, bool) {
return 0, false
}
}

rows, err := view.RetrieveData(v.Name)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions plugin/ocgrpc/server_metrics.go
Expand Up @@ -27,6 +27,7 @@ var (
ServerReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes)
ServerSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless)
ServerSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes)
ServerStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "Number of started server RPCs.", stats.UnitDimensionless)
ServerLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds)
)

Expand Down Expand Up @@ -73,6 +74,14 @@ var (
Aggregation: view.Count(),
}

ServerStartedRPCsView = &view.View{
Measure: ServerStartedRPCs,
Name: "grpc.io/server/started_rpcs",
Description: "Number of started server RPCs.",
TagKeys: []tag.Key{KeyServerMethod},
Aggregation: view.Count(),
}

ServerReceivedMessagesPerRPCView = &view.View{
Name: "grpc.io/server/received_messages_per_rpc",
Description: "Distribution of messages received count per RPC, by method.",
Expand Down
23 changes: 22 additions & 1 deletion plugin/ocgrpc/stats_common.go
Expand Up @@ -82,8 +82,10 @@ func methodName(fullname string) string {
// statsHandleRPC processes the RPC events.
func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
switch st := s.(type) {
case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
// do nothing for client
case *stats.Begin:
handleRPCBegin(ctx, st)
case *stats.OutPayload:
handleRPCOutPayload(ctx, st)
case *stats.InPayload:
Expand All @@ -95,6 +97,25 @@ func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
}
}

func handleRPCBegin(ctx context.Context, s *stats.Begin) {
d, ok := ctx.Value(rpcDataKey).(*rpcData)
if !ok {
if grpclog.V(2) {
grpclog.Infoln("Failed to retrieve *rpcData from context.")
}
}

if s.IsClient() {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
} else {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
}
}

func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
d, ok := ctx.Value(rpcDataKey).(*rpcData)
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions stats/view/worker_test.go
Expand Up @@ -180,14 +180,14 @@ func Test_Worker_MultiExport(t *testing.T) {

// Format is Resource.Labels encoded as string, then
wantPartialData := map[string][]*Row{
makeKey(nil, count.Name): []*Row{
makeKey(nil, count.Name): {
{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
makeKey(nil, sum.Name): []*Row{
makeKey(nil, sum.Name): {
{nil, &SumData{Value: 7.5}},
},
makeKey(&extraResource, count.Name): []*Row{
makeKey(&extraResource, count.Name): {
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
}
Expand Down