Skip to content

Commit

Permalink
OpenCensus Plugin: Add missing measure and views for started RPCs (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Sep 21, 2022
1 parent 2d94537 commit 9cff4d2
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/cpp/ext/filters/census/client_filter.cc
Expand Up @@ -114,6 +114,10 @@ OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
start_time_(absl::Now()) {
context_.AddSpanAttribute("previous-rpc-attempts", attempt_num);
context_.AddSpanAttribute("transparent-retry", is_transparent_retry);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
::opencensus::stats::Record({{RpcClientStartedRpcs(), 1}}, tags);
}

void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/ext/filters/census/grpc_plugin.cc
Expand Up @@ -53,6 +53,7 @@ void RegisterOpenCensusPlugin() {
RpcClientReceivedBytesPerRpc();
RpcClientRoundtripLatency();
RpcClientServerLatency();
RpcClientStartedRpcs();
RpcClientSentMessagesPerRpc();
RpcClientReceivedMessagesPerRpc();
RpcClientRetriesPerCall();
Expand All @@ -62,6 +63,7 @@ void RegisterOpenCensusPlugin() {
RpcServerSentBytesPerRpc();
RpcServerReceivedBytesPerRpc();
RpcServerServerLatency();
RpcServerStartedRpcs();
RpcServerSentMessagesPerRpc();
RpcServerReceivedMessagesPerRpc();
}
Expand Down Expand Up @@ -123,6 +125,9 @@ ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName =
ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName =
"grpc.io/client/server_latency";

ABSL_CONST_INIT const absl::string_view kRpcClientStartedRpcsMeasureName =
"grpc.io/client/started_rpcs";

ABSL_CONST_INIT const absl::string_view kRpcClientRetriesPerCallMeasureName =
"grpc.io/client/retries_per_call";

Expand Down Expand Up @@ -151,4 +156,7 @@ ABSL_CONST_INIT const absl::string_view

ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName =
"grpc.io/server/server_latency";

ABSL_CONST_INIT const absl::string_view kRpcServerStartedRpcsMeasureName =
"grpc.io/server/started_rpcs";
} // namespace grpc
8 changes: 8 additions & 0 deletions src/cpp/ext/filters/census/grpc_plugin.h
Expand Up @@ -41,6 +41,7 @@ extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName;
extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName;
extern const absl::string_view kRpcClientRoundtripLatencyMeasureName;
extern const absl::string_view kRpcClientServerLatencyMeasureName;
extern const absl::string_view kRpcClientStartedRpcsMeasureName;
extern const absl::string_view kRpcClientRetriesPerCallMeasureName;
extern const absl::string_view kRpcClientTransparentRetriesPerCallMeasureName;
extern const absl::string_view kRpcClientRetryDelayPerCallMeasureName;
Expand All @@ -50,6 +51,7 @@ extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName;
extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName;
extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName;
extern const absl::string_view kRpcServerServerLatencyMeasureName;
extern const absl::string_view kRpcServerStartedRpcsMeasureName;

// Canonical gRPC view definitions.
const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative();
Expand All @@ -60,6 +62,7 @@ const ::opencensus::stats::ViewDescriptor&
ClientReceivedBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ClientStartedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ClientRetriesPerCallCumulative();
const ::opencensus::stats::ViewDescriptor& ClientRetriesCumulative();
Expand All @@ -73,6 +76,7 @@ const ::opencensus::stats::ViewDescriptor&
ServerReceivedBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative();
const ::opencensus::stats::ViewDescriptor& ServerStartedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor&
Expand All @@ -84,6 +88,7 @@ const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ClientStartedRpcsMinute();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute();
const ::opencensus::stats::ViewDescriptor& ClientRetriesPerCallMinute();
const ::opencensus::stats::ViewDescriptor& ClientRetriesMinute();
Expand All @@ -97,6 +102,7 @@ const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ServerStartedRpcsMinute();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute();

const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour();
Expand All @@ -105,6 +111,7 @@ const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour();
const ::opencensus::stats::ViewDescriptor& ClientStartedRpcsHour();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour();
const ::opencensus::stats::ViewDescriptor& ClientRetriesPerCallHour();
const ::opencensus::stats::ViewDescriptor& ClientRetriesHour();
Expand All @@ -119,6 +126,7 @@ const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour();
const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour();
const ::opencensus::stats::ViewDescriptor& ServerStartedRpcsHour();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour();

} // namespace grpc
Expand Down
18 changes: 18 additions & 0 deletions src/cpp/ext/filters/census/measures.cc
Expand Up @@ -89,6 +89,15 @@ MeasureInt64 RpcClientReceivedMessagesPerRpc() {
return measure;
}

MeasureInt64 RpcClientStartedRpcs() {
static const auto measure =
MeasureInt64::Register(kRpcClientStartedRpcsMeasureName,
"The total number of client RPCs ever opened, "
"including those that have not been completed.",
kCount);
return measure;
}

// Client per-overall-client-call measures
MeasureInt64 RpcClientRetriesPerCall() {
static const auto measure =
Expand Down Expand Up @@ -139,6 +148,15 @@ MeasureDouble RpcServerServerLatency() {
return measure;
}

MeasureInt64 RpcServerStartedRpcs() {
static const auto measure =
MeasureInt64::Register(kRpcServerStartedRpcsMeasureName,
"The total number of server RPCs ever opened, "
"including those that have not been completed.",
kCount);
return measure;
}

MeasureInt64 RpcServerSentMessagesPerRpc() {
static const auto measure =
MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName,
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/ext/filters/census/measures.h
Expand Up @@ -31,6 +31,7 @@ ::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc();
::opencensus::stats::MeasureDouble RpcClientRoundtripLatency();
::opencensus::stats::MeasureDouble RpcClientServerLatency();
::opencensus::stats::MeasureInt64 RpcClientStartedRpcs();
::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs();
::opencensus::stats::MeasureInt64 RpcClientRetriesPerCall();
::opencensus::stats::MeasureInt64 RpcClientTransparentRetriesPerCall();
Expand All @@ -41,6 +42,7 @@ ::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc();
::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc();
::opencensus::stats::MeasureDouble RpcServerServerLatency();
::opencensus::stats::MeasureInt64 RpcServerStartedRpcs();
::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs();

} // namespace grpc
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/ext/filters/census/server_filter.cc
Expand Up @@ -106,6 +106,8 @@ void CensusServerCallData::OnDoneRecvInitialMetadataCb(
calld->qualified_method_, &calld->context_);
grpc_census_call_set_context(
calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
::opencensus::stats::Record({{RpcServerStartedRpcs(), 1}},
{{ServerMethodTagKey(), calld->method_}});
}
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->initial_on_done_recv_initial_metadata_,
Expand Down
60 changes: 60 additions & 0 deletions src/cpp/ext/filters/census/views.cc
Expand Up @@ -129,6 +129,16 @@ const ViewDescriptor& ClientServerLatencyCumulative() {
return descriptor;
}

const ViewDescriptor& ClientStartedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/started_rpcs/cumulative")
.set_measure(kRpcClientStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey());
return descriptor;
}

const ViewDescriptor& ClientCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
Expand Down Expand Up @@ -241,6 +251,16 @@ const ViewDescriptor& ServerServerLatencyCumulative() {
return descriptor;
}

const ViewDescriptor& ServerStartedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/started_rpcs/cumulative")
.set_measure(kRpcServerStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey());
return descriptor;
}

const ViewDescriptor& ServerCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
Expand Down Expand Up @@ -313,6 +333,16 @@ const ViewDescriptor& ClientServerLatencyMinute() {
return descriptor;
}

const ViewDescriptor& ClientStartedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/started_rpcs/minute")
.set_measure(kRpcClientStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey());
return descriptor;
}

const ViewDescriptor& ClientCompletedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
Expand Down Expand Up @@ -425,6 +455,16 @@ const ViewDescriptor& ServerServerLatencyMinute() {
return descriptor;
}

const ViewDescriptor& ServerStartedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/started_rpcs/minute")
.set_measure(kRpcServerStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey());
return descriptor;
}

const ViewDescriptor& ServerCompletedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
Expand Down Expand Up @@ -497,6 +537,16 @@ const ViewDescriptor& ClientServerLatencyHour() {
return descriptor;
}

const ViewDescriptor& ClientStartedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/started_rpcs/hour")
.set_measure(kRpcClientStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey());
return descriptor;
}

const ViewDescriptor& ClientCompletedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
Expand Down Expand Up @@ -609,6 +659,16 @@ const ViewDescriptor& ServerServerLatencyHour() {
return descriptor;
}

const ViewDescriptor& ServerStartedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/started_rpcs/hour")
.set_measure(kRpcServerStartedRpcsMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey());
return descriptor;
}

const ViewDescriptor& ServerCompletedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
Expand Down
48 changes: 48 additions & 0 deletions test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
Expand Up @@ -420,6 +420,54 @@ TEST_F(StatsPluginEnd2EndTest, Latency) {
::testing::DoubleEq(client_elapsed_time))))));
}

TEST_F(StatsPluginEnd2EndTest, StartedRpcs) {
View client_started_rpcs_view(ClientStartedRpcsCumulative());
View server_started_rpcs_view(ServerStartedRpcsCumulative());

EchoRequest request;
request.set_message("foo");
EchoResponse response;
const int count = 5;
for (int i = 0; i < count; ++i) {
{
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();

EXPECT_THAT(client_started_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_), i + 1)));
EXPECT_THAT(server_started_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_), i + 1)));
}

// Client should see started calls that are not yet completed.
{
ClientContext ctx;
auto stream = stub_->BidiStream(&ctx);
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
EXPECT_THAT(
client_started_rpcs_view.GetData().int_data(),
::testing::Contains(::testing::Pair(
::testing::ElementsAre("grpc.testing.EchoTestService/BidiStream"),
1)));
EXPECT_THAT(
server_started_rpcs_view.GetData().int_data(),
::testing::Contains(::testing::Pair(
::testing::ElementsAre("grpc.testing.EchoTestService/BidiStream"),
1)));
ctx.TryCancel();
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
}

TEST_F(StatsPluginEnd2EndTest, CompletedRpcs) {
View client_completed_rpcs_view(ClientCompletedRpcsCumulative());
View server_completed_rpcs_view(ServerCompletedRpcsCumulative());
Expand Down

0 comments on commit 9cff4d2

Please sign in to comment.