Skip to content

Commit

Permalink
xds: de-experimentalize xDS apis required for psm security (#4753)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Sep 15, 2021
1 parent c84a5de commit 4c5f7fb
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 92 deletions.
35 changes: 33 additions & 2 deletions connectivity/connectivity.go
Expand Up @@ -18,7 +18,6 @@

// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
// All APIs in this package are experimental.
package connectivity

import (
Expand All @@ -45,7 +44,7 @@ func (s State) String() string {
return "SHUTDOWN"
default:
logger.Errorf("unknown connectivity state: %d", s)
return "Invalid-State"
return "INVALID_STATE"
}
}

Expand All @@ -61,3 +60,35 @@ const (
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)

// ServingMode indicates the current mode of operation of the server.
//
// Only xDS enabled gRPC servers currently report their serving mode.
type ServingMode int

const (
// ServingModeStarting indicates that the server is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates that the server contains all required
// configuration and is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeStarting:
return "STARTING"
case ServingModeServing:
return "SERVING"
case ServingModeNotServing:
return "NOT_SERVING"
default:
logger.Errorf("unknown serving mode: %d", s)
return "INVALID_MODE"
}
}
5 changes: 0 additions & 5 deletions credentials/xds/xds.go
Expand Up @@ -18,11 +18,6 @@

// Package xds provides a transport credentials implementation where the
// security configuration is pushed by a management server using xDS APIs.
//
// Experimental
//
// Notice: All APIs in this package are EXPERIMENTAL and may be removed in a
// later release.
package xds

import (
Expand Down
49 changes: 10 additions & 39 deletions xds/internal/server/listener_wrapper.go
Expand Up @@ -30,6 +30,7 @@ import (
"unsafe"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
Expand All @@ -51,41 +52,11 @@ var (
backoffFunc = bs.Backoff
)

// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int

const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}

// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error)

// DrainCallback is the callback that an xDS-enabled server registers to get
// notified about updates to the Listener configuration. The server is expected
Expand Down Expand Up @@ -208,7 +179,7 @@ type listenerWrapper struct {
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
mode connectivity.ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager

Expand Down Expand Up @@ -267,7 +238,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}

l.mu.RLock()
if l.mode == ServingModeNotServing {
if l.mode == connectivity.ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
Expand Down Expand Up @@ -390,23 +361,23 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))

l.switchMode(l.filterChains, ServingModeServing, nil)
l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}

func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
Expand All @@ -428,7 +399,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// what we have decided to do. See gRPC A36 for more details.
ilc := update.update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}

Expand All @@ -447,12 +418,12 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
}

func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

Expand Down
33 changes: 17 additions & 16 deletions xds/internal/test/xds_server_serving_mode_test.go
Expand Up @@ -30,6 +30,7 @@ import (

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
testpb "google.golang.org/grpc/test/grpc_testing"
Expand Down Expand Up @@ -64,8 +65,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}

// Create a couple of channels on which mode updates will be pushed.
updateCh1 := make(chan xds.ServingMode, 1)
updateCh2 := make(chan xds.ServingMode, 1)
updateCh1 := make(chan connectivity.ServingMode, 1)
updateCh2 := make(chan connectivity.ServingMode, 1)

// Create a server option to get notified about serving mode changes, and
// push the updated mode on the channels created above.
Expand Down Expand Up @@ -124,16 +125,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand Down Expand Up @@ -169,16 +170,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand All @@ -203,8 +204,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand Down Expand Up @@ -233,16 +234,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand Down
7 changes: 4 additions & 3 deletions xds/server.go
Expand Up @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -231,7 +232,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
ListenerResourceName: name,
XDSCredsInUse: s.xdsCredsInUse,
XDSClient: s.xdsC,
ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
modeUpdateCh.Put(&modeChangeArgs{
addr: addr,
mode: mode,
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
// modeChangeArgs wraps argument required for invoking mode change callback.
type modeChangeArgs struct {
addr net.Addr
mode server.ServingMode
mode connectivity.ServingMode
err error
}

Expand All @@ -278,7 +279,7 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
case u := <-updateCh.Get():
updateCh.Load()
args := u.(*modeChangeArgs)
if args.mode == ServingModeNotServing {
if args.mode == connectivity.ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type
Expand Down
23 changes: 7 additions & 16 deletions xds/server_options.go
Expand Up @@ -22,7 +22,7 @@ import (
"net"

"google.golang.org/grpc"
iserver "google.golang.org/grpc/xds/internal/server"
"google.golang.org/grpc/connectivity"
)

type serverOptions struct {
Expand All @@ -41,20 +41,6 @@ func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }}
}

// ServingMode indicates the current mode of operation of the server.
type ServingMode = iserver.ServingMode

const (
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing = iserver.ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing = iserver.ServingModeNotServing
)

// ServingModeCallbackFunc is the callback that users can register to get
// notified about the server's serving mode changes. The callback is invoked
// with the address of the listener and its new mode.
Expand All @@ -66,7 +52,7 @@ type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs)
// function.
type ServingModeChangeArgs struct {
// Mode is the new serving mode of the server listener.
Mode ServingMode
Mode connectivity.ServingMode
// Err is set to a non-nil error if the server has transitioned into
// not-serving mode.
Err error
Expand All @@ -80,6 +66,11 @@ type ServingModeChangeArgs struct {
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func BootstrapContentsForTesting(contents []byte) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }}
}

0 comments on commit 4c5f7fb

Please sign in to comment.