Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC server metrics #14922

Merged
merged 7 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14922.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
grpc: Added metrics for external gRPC server. Added `server_type=internal|external` label to gRPC metrics.
```
12 changes: 12 additions & 0 deletions agent/grpc-external/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package external
import (
"time"

"github.com/armon/go-metrics"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
Expand All @@ -11,20 +12,31 @@ import (
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
)

var (
defaultMetrics = agentmiddleware.DefaultMetrics
metricsLabels = []metrics.Label{{
Name: "server_type",
Value: "external",
}}
)

// NewServer constructs a gRPC server for the external gRPC port, to which
// handlers can be registered.
func NewServer(logger agentmiddleware.Logger) *grpc.Server {
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)

metrics := defaultMetrics()
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metrics, metricsLabels)),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
agentmiddleware.NewActiveStreamCounter(metrics, metricsLabels).Intercept,
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
// This must be less than the keealive.ClientParameters Time setting, otherwise
Expand Down
130 changes: 130 additions & 0 deletions agent/grpc-external/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package external

import (
"context"
"net"
"sort"
"testing"
"time"

"github.com/armon/go-metrics"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/grpc-middleware/testutil"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/proto/prototest"
)

func TestServer_EmitsStats(t *testing.T) {
Copy link
Author

@pglass pglass Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies the TestHandler_EmitStats test. I did some deduplication of the helpers by moving things over to grpc-middleware/testutil. (I could do more deduplication but kind of wanted a sanity check before I go too far.)

sink, reset := patchGlobalMetrics(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of passing the metrics object as a parameter to NewServer so we don't have to change global state like this?

func NewServer(logger agentmiddleware.Logger, metrics *metrics.Metrics) *grpc.Server {
	if metrics == nil {
		metrics = agentmiddleware.DefaultMetrics
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also appreciate that these tests were copied from grpc-internal so please feel free to leave as-is! 🙇🏻‍♂️

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 I removed the global metrics instance from both grpc-internal and grpc-external


srv := NewServer(hclog.Default())
reset()

testservice.RegisterSimpleServer(srv, &testservice.Simple{})

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return srv.Serve(lis)
})
t.Cleanup(func() {
srv.Stop()
if err := g.Wait(); err != nil {
t.Logf("grpc server error: %v", err)
}
})

conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })

client := testservice.NewSimpleClient(conn)
fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"})
require.NoError(t, err)

// Wait for the first event so that we know the stream is sending.
_, err = fClient.Recv()
require.NoError(t, err)

cancel()
conn.Close()
srv.GracefulStop()
// Wait for the server to stop so that active_streams is predictable.
require.NoError(t, g.Wait())

// Occasionally the active_stream=0 metric may be emitted before the
// active_conns=0 metric. The order of those metrics is not really important
// so we sort the calls to match the expected.
sort.Slice(sink.GaugeCalls, func(i, j int) bool {
if i < 2 || j < 2 {
return i < j
}
if len(sink.GaugeCalls[i].Key) < 4 || len(sink.GaugeCalls[j].Key) < 4 {
return i < j
}
return sink.GaugeCalls[i].Key[3] < sink.GaugeCalls[j].Key[3]
})

cmpMetricCalls := cmp.AllowUnexported(testutil.MetricCall{})
expLabels := []metrics.Label{{
Name: "server_type",
Value: "external",
}}
expectedGauge := []testutil.MetricCall{
{Key: []string{"testing", "grpc", "server", "connections"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "streams"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "connections"}, Val: 0, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "streams"}, Val: 0, Labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedGauge, sink.GaugeCalls, cmpMetricCalls)

expectedCounter := []testutil.MetricCall{
{Key: []string{"testing", "grpc", "server", "connection", "count"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "request", "count"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "stream", "count"}, Val: 1, Labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedCounter, sink.IncrCounterCalls, cmpMetricCalls)
}

func patchGlobalMetrics(t *testing.T) (*testutil.FakeMetricsSink, func()) {
t.Helper()

sink := &testutil.FakeMetricsSink{}
cfg := &metrics.Config{
ServiceName: "testing",
TimerGranularity: time.Millisecond, // Timers are in milliseconds
ProfileInterval: time.Second, // Poll runtime every second
FilterDefault: true,
}
var err error
defaultMetrics = func() *metrics.Metrics {
m, _ := metrics.New(cfg, sink)
return m
}
require.NoError(t, err)
reset := func() {
t.Helper()
defaultMetrics = metrics.Default
require.NoError(t, err, "failed to reset global metrics")
}
return sink, reset
}

func logError(t *testing.T, f func() error) func() {
return func() {
if err := f(); err != nil {
t.Logf(err.Error())
}
}
}
3 changes: 2 additions & 1 deletion agent/grpc-internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
grpc.WithInsecure(),
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(newStatsHandler(defaultMetrics())),
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(agentmiddleware.DefaultMetrics(), metricsLabels)),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first"),
// Keep alive parameters are based on the same default ones we used for
Expand Down
2 changes: 1 addition & 1 deletion agent/grpc-internal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport"
Expand Down
15 changes: 12 additions & 3 deletions agent/grpc-internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"time"

"github.com/armon/go-metrics"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -13,26 +14,34 @@ import (
"google.golang.org/grpc/keepalive"
)

var (
defaultMetrics = agentmiddleware.DefaultMetrics
metricsLabels = []metrics.Label{{
Name: "server_type",
Value: "internal",
}}
)

// NewHandler returns a gRPC server that accepts connections from Handle(conn).
// The register function will be called with the grpc.Server to register
// gRPC services with the server.
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler {
metrics := defaultMetrics()

// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)

metrics := defaultMetrics()
opts := []grpc.ServerOption{
grpc.StatsHandler(newStatsHandler(metrics)),
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metrics, metricsLabels)),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
(&activeStreamCounter{metrics: metrics}).Intercept,
agentmiddleware.NewActiveStreamCounter(metrics, metricsLabels).Intercept,
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 15 * time.Second,
Expand Down
5 changes: 3 additions & 2 deletions agent/grpc-internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
)

func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
Expand Down Expand Up @@ -57,5 +57,6 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
// Checking the entire stack trace is not possible, let's
// make sure that it contains a couple of expected strings.
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-internal.(*simplePanic).Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice.(*SimplePanic).Something`)

}
44 changes: 3 additions & 41 deletions agent/grpc-internal/server_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package internal

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -15,7 +14,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
Expand All @@ -42,14 +41,14 @@ func (s testServer) Metadata() *metadata.Server {

func newSimpleTestServer(t *testing.T, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, hclog.Default(), name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
testservice.RegisterSimpleServer(server, &testservice.Simple{Name: name, DC: dc})
})
}

// newPanicTestServer sets up a simple server with handlers that panic.
func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, logger, name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc})
testservice.RegisterSimpleServer(server, &testservice.SimplePanic{Name: name, DC: dc})
})
}

Expand Down Expand Up @@ -95,43 +94,6 @@ func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *
}
}

type simple struct {
name string
dc string
}

func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc}
if err := flow.Send(resp); err != nil {
return err
}
time.Sleep(time.Millisecond)
}
return nil
}

func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}

type simplePanic struct {
name, dc string
}

func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
time.Sleep(time.Millisecond)
panic("panic from Flow")
}
return nil
}

func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
time.Sleep(time.Millisecond)
panic("panic from Something")
}

// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
// In the future we should be able to refactor Server and extract this RPC
// handling logic so that we don't need to use a fake.
Expand Down
22 changes: 13 additions & 9 deletions agent/grpc-internal/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/proto/prototest"
)

Expand All @@ -29,7 +29,7 @@ func TestHandler_EmitsStats(t *testing.T) {
handler := NewHandler(hclog.Default(), addr, noopRegister)
reset()

testservice.RegisterSimpleServer(handler.srv, &simple{})
testservice.RegisterSimpleServer(handler.srv, &testservice.Simple{})

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand Down Expand Up @@ -82,18 +82,22 @@ func TestHandler_EmitsStats(t *testing.T) {
})

cmpMetricCalls := cmp.AllowUnexported(metricCall{})
expLabels := []metrics.Label{{
Name: "server_type",
Value: "internal",
}}
expectedGauge := []metricCall{
{key: []string{"testing", "grpc", "server", "connections"}, val: 1},
{key: []string{"testing", "grpc", "server", "streams"}, val: 1},
{key: []string{"testing", "grpc", "server", "connections"}, val: 0},
{key: []string{"testing", "grpc", "server", "streams"}, val: 0},
{key: []string{"testing", "grpc", "server", "connections"}, val: 1, labels: expLabels},
{key: []string{"testing", "grpc", "server", "streams"}, val: 1, labels: expLabels},
{key: []string{"testing", "grpc", "server", "connections"}, val: 0, labels: expLabels},
{key: []string{"testing", "grpc", "server", "streams"}, val: 0, labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedGauge, sink.gaugeCalls, cmpMetricCalls)

expectedCounter := []metricCall{
{key: []string{"testing", "grpc", "server", "connection", "count"}, val: 1},
{key: []string{"testing", "grpc", "server", "request", "count"}, val: 1},
{key: []string{"testing", "grpc", "server", "stream", "count"}, val: 1},
{key: []string{"testing", "grpc", "server", "connection", "count"}, val: 1, labels: expLabels},
{key: []string{"testing", "grpc", "server", "request", "count"}, val: 1, labels: expLabels},
{key: []string{"testing", "grpc", "server", "stream", "count"}, val: 1, labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedCounter, sink.incrCounterCalls, cmpMetricCalls)
}
Expand Down