Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/dispatch: extract combined dispatcher #321

Merged
merged 1 commit into from Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 11 additions & 58 deletions cmd/spicedb/serve.go
Expand Up @@ -2,15 +2,13 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/alecthomas/units"
"github.com/authzed/grpcutil"
"github.com/fatih/color"
grpcauth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
Expand All @@ -22,7 +20,6 @@ import (
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/authzed/spicedb/internal/auth"
"github.com/authzed/spicedb/internal/dashboard"
Expand All @@ -32,20 +29,15 @@ import (
"github.com/authzed/spicedb/internal/datastore/memdb"
"github.com/authzed/spicedb/internal/datastore/postgres"
"github.com/authzed/spicedb/internal/datastore/proxy"
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/remote"
combineddispatch "github.com/authzed/spicedb/internal/dispatch/combined"
"github.com/authzed/spicedb/internal/gateway"
"github.com/authzed/spicedb/internal/middleware/servicespecific"
"github.com/authzed/spicedb/internal/namespace"
v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1"
"github.com/authzed/spicedb/internal/services"
clusterdispatch "github.com/authzed/spicedb/internal/services/dispatch"
v1alpha1svc "github.com/authzed/spicedb/internal/services/v1alpha1"
logmw "github.com/authzed/spicedb/pkg/middleware/logging"
"github.com/authzed/spicedb/pkg/middleware/requestid"
"github.com/authzed/spicedb/pkg/validationfile"
"github.com/authzed/spicedb/pkg/x509util"
)

func registerServeCmd(rootCmd *cobra.Command) {
Expand Down Expand Up @@ -282,57 +274,18 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed to create redispatch gRPC server")
}

cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client")
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize redispatcher cache")
}

redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)

// grpc consistent loadbalancer redispatch configuration
dispatchAddr := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr")
if len(dispatchAddr) > 0 {
log.Info().Str("upstream", dispatchAddr).Msg("configuring grpc consistent load balancer for redispatch")

// default options
opts := []grpc.DialOption{
redispatch, err := combineddispatch.NewDispatcher(nsm, ds, dispatchGrpcServer,
combineddispatch.UpstreamAddr(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-addr")),
combineddispatch.UpstreamCAPath(cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path")),
combineddispatch.GrpcPresharedKey(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")),
combineddispatch.GrpcDialOpts(
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"consistent-hashring"}`),
}

// optional CA
peerCAPath := cobrautil.MustGetStringExpanded(cmd, "dispatch-upstream-ca-path")
if len(peerCAPath) > 0 {
log.Info().Str("certpath", peerCAPath).Err(err).Msg("loading CA cert for dispatch cluster")
pool, err := x509util.CustomCertPool(peerCAPath)
if err != nil {
log.Fatal().Str("certpath", peerCAPath).Err(err).Msg("error loading certs for dispatch")
}
creds := credentials.NewTLS(&tls.Config{RootCAs: pool})
opts = append(opts, grpc.WithTransportCredentials(creds))
opts = append(opts, grpcutil.WithBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")))
} else {
opts = append(opts, grpcutil.WithInsecureBearerToken(cobrautil.MustGetStringExpanded(cmd, "grpc-preshared-key")))
opts = append(opts, grpc.WithInsecure())
}

conn, err := grpc.Dial(dispatchAddr, opts...)
if err != nil {
log.Fatal().Str("endpoint", dispatchAddr).Err(err).Msg("error constructing client for endpoint")
}
redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn))
}

cachingRedispatch.SetDelegate(redispatch)

clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)
cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch")
),
)
if err != nil {
log.Fatal().Err(err).Msg("failed to initialize cluster dispatcher cache")
log.Fatal().Err(err).Msg("failed when configuring dispatch")
}
cachingClusterDispatch.SetDelegate(clusterDispatch)

clusterdispatch.RegisterGrpcServices(dispatchGrpcServer, cachingClusterDispatch)

prefixRequiredOption := v1alpha1svc.PrefixRequired
if !cobrautil.MustGetBool(cmd, "schema-prefixes-required") {
Expand All @@ -348,7 +301,7 @@ func serveRun(cmd *cobra.Command, args []string) {
grpcServer,
ds,
nsm,
cachingRedispatch,
redispatch,
cobrautil.MustGetUint32(cmd, "dispatch-max-depth"),
prefixRequiredOption,
v1SchemaServiceOption,
Expand Down Expand Up @@ -436,7 +389,7 @@ func serveRun(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("failed while shutting down namespace manager")
}

if err := cachingRedispatch.Close(); err != nil {
if err := redispatch.Close(); err != nil {
log.Fatal().Err(err).Msg("failed while shutting down dispatcher")
}

Expand Down
112 changes: 112 additions & 0 deletions internal/dispatch/combined/combined.go
@@ -0,0 +1,112 @@
// Package combined implements a dispatcher that combines caching,
// redispatching and optional cluster dispatching.
package combined

import (
"os"

"github.com/authzed/grpcutil"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"

"github.com/authzed/spicedb/internal/datastore"
"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/remote"
"github.com/authzed/spicedb/internal/namespace"
v1 "github.com/authzed/spicedb/internal/proto/dispatch/v1"
dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch"
)

// Option is a function-style option for configuring a combined Dispatcher.
type Option func(*optionState)

type optionState struct {
upstreamAddr string
upstreamCAPath string
grpcPresharedKey string
grpcDialOpts []grpc.DialOption
}

// UpstreamAddr sets the optional cluster dispatching upstream address.
func UpstreamAddr(addr string) Option {
return func(state *optionState) {
state.upstreamAddr = addr
}
}

// UpstreamAddr sets the optional cluster dispatching upstream certificate
// authority.
func UpstreamCAPath(path string) Option {
return func(state *optionState) {
state.upstreamCAPath = path
}
}

// GrpcPresharedKey sets the preshared key used to authenticate for optional
// cluster dispatching.
func GrpcPresharedKey(key string) Option {
return func(state *optionState) {
state.grpcPresharedKey = key
}
}

// GrpcDialOpts sets the default DialOptions used for gRPC clients
// connecting to the optional cluster dispatching.
func GrpcDialOpts(opts ...grpc.DialOption) Option {
return func(state *optionState) {
state.grpcDialOpts = opts
}
}

// NewDispatcher initializes a Dispatcher that caches and redispatches
// optionally to the provided upstream.
func NewDispatcher(nsm namespace.Manager, ds datastore.Datastore, srv *grpc.Server, options ...Option) (dispatch.Dispatcher, error) {
var opts optionState
for _, fn := range options {
fn(&opts)
}
log.Debug().Interface("dispatchConfig", opts).Msg("configured combined dispatcher")

cachingRedispatch, err := caching.NewCachingDispatcher(nil, "dispatch_client")
if err != nil {
return nil, err
}

redispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)

// If an upstream is specified, create a cluster dispatcher.
if opts.upstreamAddr != "" {
if opts.upstreamCAPath != "" {
// Ensure that the CA path exists.
if _, err := os.Stat(opts.upstreamCAPath); err != nil {
return nil, err
}
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithCustomCerts(opts.upstreamCAPath, grpcutil.VerifyCA))
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithBearerToken(opts.grpcPresharedKey))
} else {
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithInsecureBearerToken(opts.grpcPresharedKey))
opts.grpcDialOpts = append(opts.grpcDialOpts, grpc.WithInsecure())
}

conn, err := grpc.Dial(opts.upstreamAddr, opts.grpcDialOpts...)
if err != nil {
return nil, err
}
redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn))
}

cachingRedispatch.SetDelegate(redispatch)

clusterDispatch := graph.NewDispatcher(cachingRedispatch, nsm, ds)
cachingClusterDispatch, err := caching.NewCachingDispatcher(nil, "dispatch")
if err != nil {
return nil, err
}
cachingClusterDispatch.SetDelegate(clusterDispatch)

dispatchSvc.RegisterGrpcServices(srv, cachingClusterDispatch)

return cachingClusterDispatch, nil
}