From a75c641da1fd81ef609d7716c878434ce94e0199 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sun, 5 Dec 2021 15:08:16 -0500 Subject: [PATCH] internal/dispatch: extract combined dispatcher --- cmd/spicedb/serve.go | 69 +++------------ internal/dispatch/combined/combined.go | 112 +++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 58 deletions(-) create mode 100644 internal/dispatch/combined/combined.go diff --git a/cmd/spicedb/serve.go b/cmd/spicedb/serve.go index 7cab7ad79d..927c2e3545 100644 --- a/cmd/spicedb/serve.go +++ b/cmd/spicedb/serve.go @@ -2,7 +2,6 @@ package main import ( "context" - "crypto/tls" "fmt" "os" "os/signal" @@ -10,7 +9,6 @@ import ( "time" "github.com/alecthomas/units" - "github.com/authzed/grpcutil" "github.com/fatih/color" grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth" grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2" @@ -22,7 +20,6 @@ import ( "github.com/spf13/cobra" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "github.com/authzed/spicedb/internal/auth" "github.com/authzed/spicedb/internal/dashboard" @@ -32,20 +29,15 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/datastore/postgres" "github.com/authzed/spicedb/internal/datastore/proxy" - "github.com/authzed/spicedb/internal/dispatch/caching" - "github.com/authzed/spicedb/internal/dispatch/graph" - "github.com/authzed/spicedb/internal/dispatch/remote" + combineddispatch "github.com/authzed/spicedb/internal/dispatch/combined" "github.com/authzed/spicedb/internal/gateway" "github.com/authzed/spicedb/internal/middleware/servicespecific" "github.com/authzed/spicedb/internal/namespace" - v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1" "github.com/authzed/spicedb/internal/services" - clusterdispatch "github.com/authzed/spicedb/internal/services/dispatch" v1alpha1svc "github.com/authzed/spicedb/internal/services/v1alpha1" logmw "github.com/authzed/spicedb/pkg/middleware/logging" "github.com/authzed/spicedb/pkg/middleware/requestid" "github.com/authzed/spicedb/pkg/validationfile" - "github.com/authzed/spicedb/pkg/x509util" ) func registerServeCmd(rootCmd *cobra.Command) { @@ -282,57 +274,18 @@ func serveRun(cmd *cobra.Command, args []string) { log.Fatal().Err(err).Msg("failed to create redispatch gRPC server") } - cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client") - if err != nil { - log.Fatal().Err(err).Msg("failed to initialize redispatcher cache") - } - - redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds) - - // grpc consistent loadbalancer redispatch configuration - dispatchAddr := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr") - if len(dispatchAddr) > 0 { - log.Info().Str("upstream", dispatchAddr).Msg("configuring grpc consistent load balancer for redispatch") - - // default options - opts := []grpc.DialOption{ + redispatch, err := combineddispatch.NewDispatcher(nsm, ds, dispatchGrpcServer, + combineddispatch.UpstreamAddr(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr")), + combineddispatch.UpstreamCAPath(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path")), + combineddispatch.GrpcPresharedKey(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")), + combineddispatch.GrpcDialOpts( grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"consistent-hashring"}`), - } - - // optional CA - peerCAPath := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path") - if len(peerCAPath) > 0 { - log.Info().Str("certpath", peerCAPath).Err(err).Msg("loading CA cert for dispatch cluster") - pool, err := x509util.CustomCertPool(peerCAPath) - if err != nil { - log.Fatal().Str("certpath", peerCAPath).Err(err).Msg("error loading certs for dispatch") - } - creds := credentials.NewTLS(&tls.Config{RootCAs: pool}) - opts = append(opts, grpc.WithTransportCredentials(creds)) - opts = append(opts, grpcutil.WithBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key"))) - } else { - opts = append(opts, grpcutil.WithInsecureBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key"))) - opts = append(opts, grpc.WithInsecure()) - } - - conn, err := grpc.Dial(dispatchAddr, opts...) - if err != nil { - log.Fatal().Str("endpoint", dispatchAddr).Err(err).Msg("error constructing client for endpoint") - } - redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn)) - } - - cachingRedispatch.SetDelegate(redispatch) - - clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds) - cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch") + ), + ) if err != nil { - log.Fatal().Err(err).Msg("failed to initialize cluster dispatcher cache") + log.Fatal().Err(err).Msg("failed when configuring dispatch") } - cachingClusterDispatch.SetDelegate(clusterDispatch) - - clusterdispatch.RegisterGrpcServices(dispatchGrpcServer, cachingClusterDispatch) prefixRequiredOption := v1alpha1svc.PrefixRequired if !cobrautil.MustGetBool(cmd, "schema-prefixes-required") { @@ -348,7 +301,7 @@ func serveRun(cmd *cobra.Command, args []string) { grpcServer, ds, nsm, - cachingRedispatch, + redispatch, cobrautil.MustGetUint32(cmd, "dispatch-max-depth"), prefixRequiredOption, v1SchemaServiceOption, @@ -436,7 +389,7 @@ func serveRun(cmd *cobra.Command, args []string) { log.Fatal().Err(err).Msg("failed while shutting down namespace manager") } - if err := cachingRedispatch.Close(); err != nil { + if err := redispatch.Close(); err != nil { log.Fatal().Err(err).Msg("failed while shutting down dispatcher") } diff --git a/internal/dispatch/combined/combined.go b/internal/dispatch/combined/combined.go new file mode 100644 index 0000000000..fde49f4b6b --- /dev/null +++ b/internal/dispatch/combined/combined.go @@ -0,0 +1,112 @@ +// Package combined implements a dispatcher that combines caching, +// redispatching and optional cluster dispatching. +package combined + +import ( + "os" + + "github.com/authzed/grpcutil" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + + "github.com/authzed/spicedb/internal/datastore" + "github.com/authzed/spicedb/internal/dispatch" + "github.com/authzed/spicedb/internal/dispatch/caching" + "github.com/authzed/spicedb/internal/dispatch/graph" + "github.com/authzed/spicedb/internal/dispatch/remote" + "github.com/authzed/spicedb/internal/namespace" + v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1" + dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch" +) + +// Option is a function-style option for configuring a combined Dispatcher. +type Option func(*optionState) + +type optionState struct { + upstreamAddr string + upstreamCAPath string + grpcPresharedKey string + grpcDialOpts []grpc.DialOption +} + +// UpstreamAddr sets the optional cluster dispatching upstream address. +func UpstreamAddr(addr string) Option { + return func(state *optionState) { + state.upstreamAddr = addr + } +} + +// UpstreamAddr sets the optional cluster dispatching upstream certificate +// authority. +func UpstreamCAPath(path string) Option { + return func(state *optionState) { + state.upstreamCAPath = path + } +} + +// GrpcPresharedKey sets the preshared key used to authenticate for optional +// cluster dispatching. +func GrpcPresharedKey(key string) Option { + return func(state *optionState) { + state.grpcPresharedKey = key + } +} + +// GrpcDialOpts sets the default DialOptions used for gRPC clients +// connecting to the optional cluster dispatching. +func GrpcDialOpts(opts ...grpc.DialOption) Option { + return func(state *optionState) { + state.grpcDialOpts = opts + } +} + +// NewDispatcher initializes a Dispatcher that caches and redispatches +// optionally to the provided upstream. +func NewDispatcher(nsm namespace.Manager, ds datastore.Datastore, srv *grpc.Server, options ...Option) (dispatch.Dispatcher, error) { + var opts optionState + for _, fn := range options { + fn(&opts) + } + log.Debug().Interface("dispatchConfig", opts).Msg("configured combined dispatcher") + + cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client") + if err != nil { + return nil, err + } + + redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds) + + // If an upstream is specified, create a cluster dispatcher. + if opts.upstreamAddr != "" { + if opts.upstreamCAPath != "" { + // Ensure that the CA path exists. + if _, err := os.Stat(opts.upstreamCAPath); err != nil { + return nil, err + } + opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithCustomCerts(opts.upstreamCAPath, grpcutil.VerifyCA)) + opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithBearerToken(opts.grpcPresharedKey)) + } else { + opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithInsecureBearerToken(opts.grpcPresharedKey)) + opts.grpcDialOpts = append(opts.grpcDialOpts, grpc.WithInsecure()) + } + + conn, err := grpc.Dial(opts.upstreamAddr, opts.grpcDialOpts...) + if err != nil { + return nil, err + } + redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn)) + } + + cachingRedispatch.SetDelegate(redispatch) + + clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds) + cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch") + if err != nil { + return nil, err + } + cachingClusterDispatch.SetDelegate(clusterDispatch) + + dispatchSvc.RegisterGrpcServices(srv, cachingClusterDispatch) + + return cachingClusterDispatch, nil +}