Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update peering establishment to maybe use gateways #14981

Merged
merged 8 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14981.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
peering: add support for routine peering control-plane traffic through mesh gateways
```
3 changes: 2 additions & 1 deletion acl/acl_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
package acl

const (
DefaultPartitionName = ""
WildcardPartitionName = ""
DefaultPartitionName = ""
)

// Reviewer Note: This is a little bit strange; one might want it to be "" like partition name
Expand Down
132 changes: 70 additions & 62 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
continue
}

status, found := s.peerStreamServer.StreamStatus(peer.ID)
// We may have written this peering to the store to trigger xDS updates, but still in the process of establishing.
// If there isn't a secret yet, we're still trying to reach the other server.
logger.Trace("reading peering secret", "sequence_id", seq)
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
continue
}

// TODO(peering): If there is new peering data and a connected stream, should we tear down the stream?
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found && status.Connected {
// Nothing to do when we already have an active stream to the peer.
// Updated data will only be used if the stream becomes disconnected
// since there's no need to tear down an active stream.
continue
}
logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq)
Expand All @@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
cancel()
}

if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil {
if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service?
logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err)
Expand All @@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)

// Clean up active streams of peerings that were deleted from the state store.
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
for stream, doneCh := range connectedStreams {
if _, ok := stored[stream]; ok {
// Active stream is in the state store, nothing to do.
Expand All @@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
return merr.ErrorOrNil()
}

func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error {
func (s *Server) establishStream(ctx context.Context,
logger hclog.Logger,
peer *pbpeering.Peering,
secret *pbpeering.PeeringSecrets,
cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)

if peer.PeerID == "" {
Expand All @@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
}

secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
return errors.New("missing stream secret for peering stream authorization, peering must be re-established")
}
Expand All @@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string)
go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr)
go s.watchAddresses(streamCtx, peer.ID, nextServerAddr)

// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
addr := <-nextServerAddr
addr, stillOpen := <-nextServerAddr
if !stillOpen {
// If the channel was closed that means the context was canceled, so we return.
return streamCtx.Err()
}

opts := []grpc.DialOption{
tlsOption,
// TODO(peering): Use a grpc.WithStatsHandler here.
// This should wait until the grpc-external server is wired up with a stats handler in NET-50.
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Expand All @@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)),
}
// TODO(peering): use a grpc.WithStatsHandler here?)

logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...)

Expand Down Expand Up @@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error())
if isErrCode(err, codes.FailedPrecondition) {

switch {
case isErrCode(err, codes.FailedPrecondition):
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err)
return
} else if isErrCode(err, codes.ResourceExhausted) {

case isErrCode(err, codes.ResourceExhausted):
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err)
return

case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
logger.Debug("stream context was canceled", "error", err)

case err != nil:
logger.Error("error managing peering stream", "error", err)
}
logger.Error("error managing peering stream", "error", err)
}, peeringRetryTimeout)

return nil
}

// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr.
// It loads the server addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted)
// 2. the peer is modified and the watchset fires.
// watchAddresses sends an up-to-date address to nextServerAddr.
// These could be either remote peer server addresses, or local mesh gateways.
// The function loads the addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data)
// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires.
//
// In case (2) we refetch the peering and rebuild the ring buffer.
func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) {
// In case (2) we re-fetch all the data sources and rebuild the ring buffer.
// In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with
// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses.
func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) {
defer close(nextServerAddr)

// we initialize the ring buffer with the peer passed to `establishStream`
// because the caller has pre-checked `peer.ShouldDial`, guaranteeing
// at least one server address.
//
// IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block.
ringbuf := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
innerWs := memdb.NewWatchSet()
_, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.",
"peer_id", peer.ID,
"error", err)
}
var ringbuf *ring.Ring
var ws memdb.WatchSet

fetchAddrs := func() error {
// reinstantiate innerWs to prevent it from growing indefinitely
innerWs = memdb.NewWatchSet()
_, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
fetchAddresses := func() error {
// Re-instantiate ws since it can only be watched once.
ws = memdb.NewWatchSet()

newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID)
if err != nil {
return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err)
}
if !peering.IsActive() {
return fmt.Errorf("peer %q is no longer active", peer.ID)
}
if len(peering.PeerServerAddresses) == 0 {
return fmt.Errorf("peer %q has no addresses to dial", peer.ID)
return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err)
}
ringbuf = newRing

ringbuf = ring.New(len(peering.PeerServerAddresses))
for _, addr := range peering.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
return nil
}

// Initialize the first ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err)
}

for {
select {
case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next()
case err := <-innerWs.WatchCh(ctx):

case err := <-ws.WatchCh(ctx):
if err != nil {
// context was cancelled
// Context was cancelled.
return
}
// watch fired so we refetch the peering and rebuild the ring buffer
if err := fetchAddrs(); err != nil {
s.logger.Warn("watchset for peer was fired but failed to update server addresses",
"peer_id", peer.ID,

// Watch fired so we re-fetch the necessary addresses and replace the ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated",
"peer_id", peerID,
"error", err)
}
}
Expand Down