Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: de-experimentalize xDS apis required for psm security #4753

Merged
merged 5 commits into from Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 29 additions & 1 deletion connectivity/connectivity.go
Expand Up @@ -18,7 +18,6 @@

// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
// All APIs in this package are experimental.
package connectivity

import (
Expand Down Expand Up @@ -61,3 +60,32 @@ const (
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)

// ServingMode indicates the current mode of operation of the server.
//
// Only xDS enabled gRPC servers currently report their serving mode.
type ServingMode int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.


This actually promotes an xDS only concept to a gRPC concept.

The concern is that in the future we may change the meanings of these terms in a non-xDS gRPC server. But with the 3 values we have now, that seems unlikely.

  • non-xDS server doesn't have starting with our current API. If our API is to take a port instead of a listener, the state after NewServer, before the net.Listen() would be ServingModeStarting
  • serving is serving
  • NotServing is after gracefulstop, before stop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • non-xDS server doesn't have starting with our current API. If our API is to take a port instead of a listener, the state after NewServer, before the net.Listen() would be ServingModeStarting

Even with our current API taking a net.Listener, we could have a starting state after NewServer and before we call Accept(). It is very fleeting though.

  • NotServing is after gracefulstop, before stop?

Sounds about right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds about right.

What happens at "stop", then? It no longer has any serving mode? I'd think the NotServing state continues after "stop", too.

Copy link
Contributor

@menghanl menghanl Sep 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a starting. So should we also add a stopped (definitely not stopping).

  • NotServing means no new streams?
  • Stopped means no streams?

Or we can add them when needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds about right.

What happens at "stop", then? It no longer has any serving mode? I'd think the NotServing state continues after "stop", too.

The only difference I wanted to highlight was that in NotServing mode, we accept and close the connection without performing any I/O (this is a limitation of our API, since we accept a net.Listener). Once stop is complete, we don't even accept the connection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotServing means no new streams?

Or "no new connections" is what I thought. Existing connections may allow new streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"no new streams" and "no new connections" are different.
GracefulStop also sends GOAWAYs, so it won't accept new streams.

completely-NotServing vs almost-NotServing? 🙈


const (
// ServingModeStarting indicates that the server is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates that the server contains all required
// configuration and is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows some inconsistencies now between here and Connectivity.State. Should this be "NOT_SERVING", "SERVING", explicit "STARTING", and default: "Invalid-State" (or modify the above to INVALID_STATE" & match here)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
5 changes: 0 additions & 5 deletions credentials/xds/xds.go
Expand Up @@ -18,11 +18,6 @@

// Package xds provides a transport credentials implementation where the
// security configuration is pushed by a management server using xDS APIs.
//
// Experimental
//
// Notice: All APIs in this package are EXPERIMENTAL and may be removed in a
// later release.
package xds

import (
Expand Down
49 changes: 10 additions & 39 deletions xds/internal/server/listener_wrapper.go
Expand Up @@ -30,6 +30,7 @@ import (
"unsafe"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
Expand All @@ -51,41 +52,11 @@ var (
backoffFunc = bs.Backoff
)

// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int

const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}

// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error)

// DrainCallback is the callback that an xDS-enabled server registers to get
// notified about updates to the Listener configuration. The server is expected
Expand Down Expand Up @@ -208,7 +179,7 @@ type listenerWrapper struct {
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
mode connectivity.ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager

Expand Down Expand Up @@ -267,7 +238,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}

l.mu.RLock()
if l.mode == ServingModeNotServing {
if l.mode == connectivity.ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
Expand Down Expand Up @@ -390,23 +361,23 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))

l.switchMode(l.filterChains, ServingModeServing, nil)
l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}

func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
Expand All @@ -428,7 +399,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// what we have decided to do. See gRPC A36 for more details.
ilc := update.update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}

Expand All @@ -447,12 +418,12 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
}

func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

Expand Down
33 changes: 17 additions & 16 deletions xds/internal/test/xds_server_serving_mode_test.go
Expand Up @@ -30,6 +30,7 @@ import (

v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
testpb "google.golang.org/grpc/test/grpc_testing"
Expand Down Expand Up @@ -64,8 +65,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}

// Create a couple of channels on which mode updates will be pushed.
updateCh1 := make(chan xds.ServingMode, 1)
updateCh2 := make(chan xds.ServingMode, 1)
updateCh1 := make(chan connectivity.ServingMode, 1)
updateCh2 := make(chan connectivity.ServingMode, 1)

// Create a server option to get notified about serving mode changes, and
// push the updated mode on the channels created above.
Expand Down Expand Up @@ -124,16 +125,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand Down Expand Up @@ -169,16 +170,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand All @@ -203,8 +204,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand Down Expand Up @@ -233,16 +234,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand Down
7 changes: 4 additions & 3 deletions xds/server.go
Expand Up @@ -28,6 +28,7 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -231,7 +232,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
ListenerResourceName: name,
XDSCredsInUse: s.xdsCredsInUse,
XDSClient: s.xdsC,
ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
modeUpdateCh.Put(&modeChangeArgs{
addr: addr,
mode: mode,
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
// modeChangeArgs wraps argument required for invoking mode change callback.
type modeChangeArgs struct {
addr net.Addr
mode server.ServingMode
mode connectivity.ServingMode
err error
}

Expand All @@ -278,7 +279,7 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
case u := <-updateCh.Get():
updateCh.Load()
args := u.(*modeChangeArgs)
if args.mode == ServingModeNotServing {
if args.mode == connectivity.ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type
Expand Down
23 changes: 7 additions & 16 deletions xds/server_options.go
Expand Up @@ -22,7 +22,7 @@ import (
"net"

"google.golang.org/grpc"
iserver "google.golang.org/grpc/xds/internal/server"
"google.golang.org/grpc/connectivity"
)

type serverOptions struct {
Expand All @@ -41,20 +41,6 @@ func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }}
}

// ServingMode indicates the current mode of operation of the server.
type ServingMode = iserver.ServingMode

const (
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing = iserver.ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing = iserver.ServingModeNotServing
)

// ServingModeCallbackFunc is the callback that users can register to get
// notified about the server's serving mode changes. The callback is invoked
// with the address of the listener and its new mode.
Expand All @@ -66,7 +52,7 @@ type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs)
// function.
type ServingModeChangeArgs struct {
// Mode is the new serving mode of the server listener.
Mode ServingMode
Mode connectivity.ServingMode
// Err is set to a non-nil error if the server has transitioned into
// not-serving mode.
Err error
Expand All @@ -80,6 +66,11 @@ type ServingModeChangeArgs struct {
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func BootstrapContentsForTesting(contents []byte) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }}
}
13 changes: 7 additions & 6 deletions xds/server_test.go
Expand Up @@ -35,6 +35,7 @@ import (
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/credentials/xds"
Expand Down Expand Up @@ -435,8 +436,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing)
}

// Push a good LDS response, and wait for Serve() to be invoked on the
Expand All @@ -463,8 +464,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeServing)
}

// Push an update to the registered listener watch callback with a Listener
Expand All @@ -489,8 +490,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing)
}
}

Expand Down