Skip to content

Commit

Permalink
internal/dispatch: extract combined dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jzelinskie committed Dec 6, 2021
1 parent cdd4c7f commit a75c641
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 58 deletions.
69 changes: 11 additions & 58 deletions cmd/spicedb/serve.go
Expand Up @@ -2,15 +2,13 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/alecthomas/units"
"github.com/authzed/grpcutil"
"github.com/fatih/color"
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
Expand All @@ -22,7 +20,6 @@ import (
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/authzed/spicedb/internal/auth"
"github.com/authzed/spicedb/internal/dashboard"
Expand All @@ -32,20 +29,15 @@ import (
"github.com/authzed/spicedb/internal/datastore/memdb"
"github.com/authzed/spicedb/internal/datastore/postgres"
"github.com/authzed/spicedb/internal/datastore/proxy"
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/remote"
combineddispatch "github.com/authzed/spicedb/internal/dispatch/combined"
"github.com/authzed/spicedb/internal/gateway"
"github.com/authzed/spicedb/internal/middleware/servicespecific"
"github.com/authzed/spicedb/internal/namespace"
v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1"
"github.com/authzed/spicedb/internal/services"
clusterdispatch "github.com/authzed/spicedb/internal/services/dispatch"
v1alpha1svc "github.com/authzed/spicedb/internal/services/v1alpha1"
logmw "github.com/authzed/spicedb/pkg/middleware/logging"
"github.com/authzed/spicedb/pkg/middleware/requestid"
"github.com/authzed/spicedb/pkg/validationfile"
"github.com/authzed/spicedb/pkg/x509util"
)

func registerServeCmd(rootCmd *cobra.Command) {
Expand Down Expand Up @@ -282,57 +274,18 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed to create redispatch gRPC server")
}

cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize redispatcher cache")
}

redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)

// grpc consistent loadbalancer redispatch configuration
dispatchAddr := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr")
if len(dispatchAddr) > 0 {
log.Info().Str("upstream", dispatchAddr).Msg("configuring grpc consistent load balancer for redispatch")

// default options
opts := []grpc.DialOption{
redispatch, err := combineddispatch.NewDispatcher(nsm, ds, dispatchGrpcServer,
combineddispatch.UpstreamAddr(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr")),
combineddispatch.UpstreamCAPath(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path")),
combineddispatch.GrpcPresharedKey(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")),
combineddispatch.GrpcDialOpts(
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"consistent-hashring"}`),
}

// optional CA
peerCAPath := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path")
if len(peerCAPath) > 0 {
log.Info().Str("certpath", peerCAPath).Err(err).Msg("loading CA cert for dispatch cluster")
pool, err := x509util.CustomCertPool(peerCAPath)
if err != nil {
log.Fatal().Str("certpath", peerCAPath).Err(err).Msg("error loading certs for dispatch")
}
creds := credentials.NewTLS(&tls.Config{RootCAs: pool})
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpcutil.WithBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")))
} else {
opts = append(opts, grpcutil.WithInsecureBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")))
opts = append(opts, grpc.WithInsecure())
}

conn, err := grpc.Dial(dispatchAddr, opts...)
if err != nil {
log.Fatal().Str("endpoint", dispatchAddr).Err(err).Msg("error constructing client for endpoint")
}
redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn))
}

cachingRedispatch.SetDelegate(redispatch)

clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)
cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch")
),
)
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize cluster dispatcher cache")
log.Fatal().Err(err).Msg("failed when configuring dispatch")
}
cachingClusterDispatch.SetDelegate(clusterDispatch)

clusterdispatch.RegisterGrpcServices(dispatchGrpcServer, cachingClusterDispatch)

prefixRequiredOption := v1alpha1svc.PrefixRequired
if !cobrautil.MustGetBool(cmd, "schema-prefixes-required") {
Expand All @@ -348,7 +301,7 @@ func serveRun(cmd *cobra.Command, args []string) {
grpcServer,
ds,
nsm,
cachingRedispatch,
redispatch,
cobrautil.MustGetUint32(cmd, "dispatch-max-depth"),
prefixRequiredOption,
v1SchemaServiceOption,
Expand Down Expand Up @@ -436,7 +389,7 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed while shutting down namespace manager")
}

if err := cachingRedispatch.Close(); err != nil {
if err := redispatch.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down dispatcher")
}

Expand Down
112 changes: 112 additions & 0 deletions internal/dispatch/combined/combined.go
@@ -0,0 +1,112 @@
// Package combined implements a dispatcher that combines caching,
// redispatching and optional cluster dispatching.
package combined

import (
"os"

"github.com/authzed/grpcutil"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"

"github.com/authzed/spicedb/internal/datastore"
"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/remote"
"github.com/authzed/spicedb/internal/namespace"
v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1"
dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch"
)

// Option is a function-style option for configuring a combined Dispatcher.
type Option func(*optionState)

type optionState struct {
upstreamAddr string
upstreamCAPath string
grpcPresharedKey string
grpcDialOpts []grpc.DialOption
}

// UpstreamAddr sets the optional cluster dispatching upstream address.
func UpstreamAddr(addr string) Option {
return func(state *optionState) {
state.upstreamAddr = addr
}
}

// UpstreamAddr sets the optional cluster dispatching upstream certificate
// authority.
func UpstreamCAPath(path string) Option {
return func(state *optionState) {
state.upstreamCAPath = path
}
}

// GrpcPresharedKey sets the preshared key used to authenticate for optional
// cluster dispatching.
func GrpcPresharedKey(key string) Option {
return func(state *optionState) {
state.grpcPresharedKey = key
}
}

// GrpcDialOpts sets the default DialOptions used for gRPC clients
// connecting to the optional cluster dispatching.
func GrpcDialOpts(opts ...grpc.DialOption) Option {
return func(state *optionState) {
state.grpcDialOpts = opts
}
}

// NewDispatcher initializes a Dispatcher that caches and redispatches
// optionally to the provided upstream.
func NewDispatcher(nsm namespace.Manager, ds datastore.Datastore, srv *grpc.Server, options ...Option) (dispatch.Dispatcher, error) {
var opts optionState
for _, fn := range options {
fn(&opts)
}
log.Debug().Interface("dispatchConfig", opts).Msg("configured combined dispatcher")

cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client")
if err != nil {
return nil, err
}

redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)

// If an upstream is specified, create a cluster dispatcher.
if opts.upstreamAddr != "" {
if opts.upstreamCAPath != "" {
// Ensure that the CA path exists.
if _, err := os.Stat(opts.upstreamCAPath); err != nil {
return nil, err
}
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithCustomCerts(opts.upstreamCAPath, grpcutil.VerifyCA))
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithBearerToken(opts.grpcPresharedKey))
} else {
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithInsecureBearerToken(opts.grpcPresharedKey))
opts.grpcDialOpts = append(opts.grpcDialOpts, grpc.WithInsecure())
}

conn, err := grpc.Dial(opts.upstreamAddr, opts.grpcDialOpts...)
if err != nil {
return nil, err
}
redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn))
}

cachingRedispatch.SetDelegate(redispatch)

clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)
cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch")
if err != nil {
return nil, err
}
cachingClusterDispatch.SetDelegate(clusterDispatch)

dispatchSvc.RegisterGrpcServices(srv, cachingClusterDispatch)

return cachingClusterDispatch, nil
}

0 comments on commit a75c641

Please sign in to comment.