Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC server metrics #14922

Merged
merged 7 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14922.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
grpc: Added metrics for external gRPC server. Added `server_type=internal|external` label to gRPC metrics.
```
5 changes: 4 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ func (a *Agent) Start(ctx context.Context) error {

// This needs to happen after the initial auto-config is loaded, because TLS
// can only be configured on the gRPC server at the point of creation.
a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external"))
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),
metrics.Default(),
)

if err := a.startLicenseManager(ctx); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
s.externalConnectCAServer.Register(srv)
}

return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register)
return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil)
}

func (s *Server) connectCARootsMonitor(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
oldNotify()
}
}
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"))
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil)
srv, err := NewServer(c, deps, grpcServer)
if err != nil {
return nil, err
Expand Down
15 changes: 14 additions & 1 deletion agent/grpc-external/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package external
import (
"time"

"github.com/armon/go-metrics"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
Expand All @@ -11,20 +12,32 @@ import (
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
)

var (
metricsLabels = []metrics.Label{{
Name: "server_type",
Value: "external",
}}
)

// NewServer constructs a gRPC server for the external gRPC port, to which
// handlers can be registered.
func NewServer(logger agentmiddleware.Logger) *grpc.Server {
func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics) *grpc.Server {
if metricsObj == nil {
metricsObj = metrics.Default()
}
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)

opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(2048),
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
agentmiddleware.NewActiveStreamCounter(metricsObj, metricsLabels).Intercept,
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
// This must be less than the keealive.ClientParameters Time setting, otherwise
Expand Down
104 changes: 104 additions & 0 deletions agent/grpc-external/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package external

import (
"context"
"net"
"sort"
"testing"

"github.com/armon/go-metrics"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/grpc-middleware/testutil"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/proto/prototest"
)

func TestServer_EmitsStats(t *testing.T) {
Copy link
Author

@pglass pglass Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies the TestHandler_EmitStats test. I did some deduplication of the helpers by moving things over to grpc-middleware/testutil. (I could do more deduplication but kind of wanted a sanity check before I go too far.)

sink, metricsObj := testutil.NewFakeSink(t)

srv := NewServer(hclog.Default(), metricsObj)

testservice.RegisterSimpleServer(srv, &testservice.Simple{})

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return srv.Serve(lis)
})
t.Cleanup(func() {
srv.Stop()
if err := g.Wait(); err != nil {
t.Logf("grpc server error: %v", err)
}
})

conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() { conn.Close() })

client := testservice.NewSimpleClient(conn)
fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"})
require.NoError(t, err)

// Wait for the first event so that we know the stream is sending.
_, err = fClient.Recv()
require.NoError(t, err)

cancel()
conn.Close()
srv.GracefulStop()
// Wait for the server to stop so that active_streams is predictable.
require.NoError(t, g.Wait())

// Occasionally the active_stream=0 metric may be emitted before the
// active_conns=0 metric. The order of those metrics is not really important
// so we sort the calls to match the expected.
sort.Slice(sink.GaugeCalls, func(i, j int) bool {
if i < 2 || j < 2 {
return i < j
}
if len(sink.GaugeCalls[i].Key) < 4 || len(sink.GaugeCalls[j].Key) < 4 {
return i < j
}
return sink.GaugeCalls[i].Key[3] < sink.GaugeCalls[j].Key[3]
})

cmpMetricCalls := cmp.AllowUnexported(testutil.MetricCall{})
expLabels := []metrics.Label{{
Name: "server_type",
Value: "external",
}}
expectedGauge := []testutil.MetricCall{
{Key: []string{"testing", "grpc", "server", "connections"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "streams"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "connections"}, Val: 0, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "streams"}, Val: 0, Labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedGauge, sink.GaugeCalls, cmpMetricCalls)

expectedCounter := []testutil.MetricCall{
{Key: []string{"testing", "grpc", "server", "connection", "count"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "request", "count"}, Val: 1, Labels: expLabels},
{Key: []string{"testing", "grpc", "server", "stream", "count"}, Val: 1, Labels: expLabels},
}
prototest.AssertDeepEqual(t, expectedCounter, sink.IncrCounterCalls, cmpMetricCalls)
}

func logError(t *testing.T, f func() error) func() {
return func() {
if err := f(); err != nil {
t.Logf(err.Error())
}
}
}
4 changes: 3 additions & 1 deletion agent/grpc-internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/armon/go-metrics"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
Expand Down Expand Up @@ -129,7 +131,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
grpc.WithInsecure(),
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(newStatsHandler(defaultMetrics())),
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first"),
// Keep alive parameters are based on the same default ones we used for
Expand Down
2 changes: 1 addition & 1 deletion agent/grpc-internal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport"
Expand Down
18 changes: 14 additions & 4 deletions agent/grpc-internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"time"

"github.com/armon/go-metrics"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -13,26 +14,35 @@ import (
"google.golang.org/grpc/keepalive"
)

var (
metricsLabels = []metrics.Label{{
Name: "server_type",
Value: "internal",
}}
)

// NewHandler returns a gRPC server that accepts connections from Handle(conn).
// The register function will be called with the grpc.Server to register
// gRPC services with the server.
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler {
metrics := defaultMetrics()
func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics) *Handler {
if metricsObj == nil {
metricsObj = metrics.Default()
}

// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger)

opts := []grpc.ServerOption{
grpc.StatsHandler(newStatsHandler(metrics)),
grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)),
middleware.WithUnaryServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.UnaryServerInterceptor(recoveryOpts...),
),
middleware.WithStreamServerChain(
// Add middlware interceptors to recover in case of panics.
recovery.StreamServerInterceptor(recoveryOpts...),
(&activeStreamCounter{metrics: metrics}).Intercept,
agentmiddleware.NewActiveStreamCounter(metricsObj, metricsLabels).Intercept,
),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 15 * time.Second,
Expand Down
5 changes: 3 additions & 2 deletions agent/grpc-internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
)

func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
Expand Down Expand Up @@ -57,5 +57,6 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
// Checking the entire stack trace is not possible, let's
// make sure that it contains a couple of expected strings.
require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-internal.(*simplePanic).Something`)
require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice.(*SimplePanic).Something`)

}
46 changes: 4 additions & 42 deletions agent/grpc-internal/server_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package internal

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -15,7 +14,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/grpc-internal/internal/testservice"
"github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/tlsutil"
Expand All @@ -42,20 +41,20 @@ func (s testServer) Metadata() *metadata.Server {

func newSimpleTestServer(t *testing.T, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, hclog.Default(), name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc})
testservice.RegisterSimpleServer(server, &testservice.Simple{Name: name, DC: dc})
})
}

// newPanicTestServer sets up a simple server with handlers that panic.
func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator) testServer {
return newTestServer(t, logger, name, dc, tlsConf, func(server *grpc.Server) {
testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc})
testservice.RegisterSimpleServer(server, &testservice.SimplePanic{Name: name, DC: dc})
})
}

func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(logger, addr, register)
handler := NewHandler(logger, addr, register, nil)

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand Down Expand Up @@ -95,43 +94,6 @@ func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *
}
}

type simple struct {
name string
dc string
}

func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc}
if err := flow.Send(resp); err != nil {
return err
}
time.Sleep(time.Millisecond)
}
return nil
}

func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}

type simplePanic struct {
name, dc string
}

func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
time.Sleep(time.Millisecond)
panic("panic from Flow")
}
return nil
}

func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
time.Sleep(time.Millisecond)
panic("panic from Something")
}

// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
// In the future we should be able to refactor Server and extract this RPC
// handling logic so that we don't need to use a fake.
Expand Down
13 changes: 9 additions & 4 deletions agent/grpc-internal/services/subscribe/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,15 @@ var _ Backend = (*testBackend)(nil)
func runTestServer(t *testing.T, server *Server) net.Addr {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
var grpcServer *gogrpc.Server
handler := grpc.NewHandler(hclog.New(nil), addr, func(srv *gogrpc.Server) {
grpcServer = srv
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
})
handler := grpc.NewHandler(
hclog.New(nil),
addr,
func(srv *gogrpc.Server) {
grpcServer = srv
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server)
},
nil,
)

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand Down