Skip to content

Commit

Permalink
Merge pull request #14981 from hashicorp/peering/dial-through-gateways
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv committed Oct 14, 2022
2 parents 0c8563f + c77123a commit 24d0c88
Show file tree
Hide file tree
Showing 28 changed files with 1,173 additions and 121 deletions.
3 changes: 3 additions & 0 deletions .changelog/14981.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
peering: add support for routine peering control-plane traffic through mesh gateways
```
3 changes: 2 additions & 1 deletion acl/acl_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
package acl

const (
DefaultPartitionName = ""
WildcardPartitionName = ""
DefaultPartitionName = ""
)

// Reviewer Note: This is a little bit strange; one might want it to be "" like partition name
Expand Down
132 changes: 70 additions & 62 deletions agent/consul/leader_peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,22 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
continue
}

status, found := s.peerStreamServer.StreamStatus(peer.ID)
// We may have written this peering to the store to trigger xDS updates, but still in the process of establishing.
// If there isn't a secret yet, we're still trying to reach the other server.
logger.Trace("reading peering secret", "sequence_id", seq)
secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
continue
}

// TODO(peering): If there is new peering data and a connected stream, should we tear down the stream?
status, found := s.peerStreamServer.StreamStatus(peer.ID)
if found && status.Connected {
// Nothing to do when we already have an active stream to the peer.
// Updated data will only be used if the stream becomes disconnected
// since there's no need to tear down an active stream.
continue
}
logger.Trace("ensuring stream to peer", "peer_id", peer.ID, "sequence_id", seq)
Expand All @@ -259,7 +270,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
cancel()
}

if err := s.establishStream(ctx, logger, ws, peer, cancelFns); err != nil {
if err := s.establishStream(ctx, logger, peer, secret, cancelFns); err != nil {
// TODO(peering): These errors should be reported in the peer status, otherwise they're only in the logs.
// Lockable status isn't available here though. Could report it via the peering.Service?
logger.Error("error establishing peering stream", "peer_id", peer.ID, "error", err)
Expand All @@ -273,7 +284,6 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
logger.Trace("checking connected streams", "streams", connectedStreams, "sequence_id", seq)

// Clean up active streams of peerings that were deleted from the state store.
// TODO(peering): This is going to trigger shutting down peerings we generated a token for. Is that OK?
for stream, doneCh := range connectedStreams {
if _, ok := stored[stream]; ok {
// Active stream is in the state store, nothing to do.
Expand All @@ -298,7 +308,11 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
return merr.ErrorOrNil()
}

func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws memdb.WatchSet, peer *pbpeering.Peering, cancelFns map[string]context.CancelFunc) error {
func (s *Server) establishStream(ctx context.Context,
logger hclog.Logger,
peer *pbpeering.Peering,
secret *pbpeering.PeeringSecrets,
cancelFns map[string]context.CancelFunc) error {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)

if peer.PeerID == "" {
Expand All @@ -310,10 +324,6 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
return fmt.Errorf("failed to build TLS dial option from peering: %w", err)
}

secret, err := s.fsm.State().PeeringSecretsRead(ws, peer.ID)
if err != nil {
return fmt.Errorf("failed to read secret for peering: %w", err)
}
if secret.GetStream().GetActiveSecretID() == "" {
return errors.New("missing stream secret for peering stream authorization, peering must be re-established")
}
Expand All @@ -331,15 +341,21 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
// Start a goroutine to watch for updates to peer server addresses.
// The latest valid server address can be received from nextServerAddr.
nextServerAddr := make(chan string)
go s.watchPeerServerAddrs(streamCtx, peer, nextServerAddr)
go s.watchAddresses(streamCtx, peer.ID, nextServerAddr)

// Establish a stream-specific retry so that retrying stream/conn errors isn't dependent on state store changes.
go retryLoopBackoffPeering(streamCtx, logger, func() error {
// Try a new address on each iteration by advancing the ring buffer on errors.
addr := <-nextServerAddr
addr, stillOpen := <-nextServerAddr
if !stillOpen {
// If the channel was closed that means the context was canceled, so we return.
return streamCtx.Err()
}

opts := []grpc.DialOption{
tlsOption,
// TODO(peering): Use a grpc.WithStatsHandler here.
// This should wait until the grpc-external server is wired up with a stats handler in NET-50.
// For keep alive parameters there is a larger comment in ClientConnPool.dial about that.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Expand All @@ -349,7 +365,7 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(50 * 1024 * 1024)),
}
// TODO(peering): use a grpc.WithStatsHandler here?)

logger.Trace("dialing peer", "addr", addr)
conn, err := grpc.DialContext(streamCtx, addr, opts...)

Expand Down Expand Up @@ -400,83 +416,75 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, ws me
}, func(err error) {
// TODO(peering): why are we using TrackSendError here? This could also be a receive error.
streamStatus.TrackSendError(err.Error())
if isErrCode(err, codes.FailedPrecondition) {

switch {
case isErrCode(err, codes.FailedPrecondition):
logger.Debug("stream disconnected due to 'failed precondition' error; reconnecting",
"error", err)
return
} else if isErrCode(err, codes.ResourceExhausted) {

case isErrCode(err, codes.ResourceExhausted):
logger.Debug("stream disconnected due to 'resource exhausted' error; reconnecting",
"error", err)
return

case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
logger.Debug("stream context was canceled", "error", err)

case err != nil:
logger.Error("error managing peering stream", "error", err)
}
logger.Error("error managing peering stream", "error", err)
}, peeringRetryTimeout)

return nil
}

// watchPeerServerAddrs sends an up-to-date peer server address to nextServerAddr.
// It loads the server addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted)
// 2. the peer is modified and the watchset fires.
// watchAddresses sends an up-to-date address to nextServerAddr.
// These could be either remote peer server addresses, or local mesh gateways.
// The function loads the addresses into a ring buffer and cycles through them until:
// 1. streamCtx is cancelled (peer is deleted or we're re-establishing the stream with new data)
// 2. the peer, Mesh config entry, or (optionally) mesh gateway address set is modified, and the watchset fires.
//
// In case (2) we refetch the peering and rebuild the ring buffer.
func (s *Server) watchPeerServerAddrs(ctx context.Context, peer *pbpeering.Peering, nextServerAddr chan<- string) {
// In case (2) we re-fetch all the data sources and rebuild the ring buffer.
// In the event that the PeerThroughMeshGateways is set in the Mesh entry, we front-load the ring buffer with
// local mesh gateway addresses, so we can try those first, with the option to fall back to remote server addresses.
func (s *Server) watchAddresses(ctx context.Context, peerID string, nextServerAddr chan<- string) {
defer close(nextServerAddr)

// we initialize the ring buffer with the peer passed to `establishStream`
// because the caller has pre-checked `peer.ShouldDial`, guaranteeing
// at least one server address.
//
// IMPORTANT: ringbuf must always be length > 0 or else `<-nextServerAddr` may block.
ringbuf := ring.New(len(peer.PeerServerAddresses))
for _, addr := range peer.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
innerWs := memdb.NewWatchSet()
_, _, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
if err != nil {
s.logger.Warn("failed to watch for changes to peer; server addresses may become stale over time.",
"peer_id", peer.ID,
"error", err)
}
var ringbuf *ring.Ring
var ws memdb.WatchSet

fetchAddrs := func() error {
// reinstantiate innerWs to prevent it from growing indefinitely
innerWs = memdb.NewWatchSet()
_, peering, err := s.fsm.State().PeeringReadByID(innerWs, peer.ID)
fetchAddresses := func() error {
// Re-instantiate ws since it can only be watched once.
ws = memdb.NewWatchSet()

newRing, _, err := s.peeringBackend.GetDialAddresses(s.logger, ws, peerID)
if err != nil {
return fmt.Errorf("failed to fetch peer %q: %w", peer.ID, err)
}
if !peering.IsActive() {
return fmt.Errorf("peer %q is no longer active", peer.ID)
}
if len(peering.PeerServerAddresses) == 0 {
return fmt.Errorf("peer %q has no addresses to dial", peer.ID)
return fmt.Errorf("failed to fetch updated addresses to dial peer: %w", err)
}
ringbuf = newRing

ringbuf = ring.New(len(peering.PeerServerAddresses))
for _, addr := range peering.PeerServerAddresses {
ringbuf.Value = addr
ringbuf = ringbuf.Next()
}
return nil
}

// Initialize the first ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("error fetching addresses", "peer_id", peerID, "error", err)
}

for {
select {
case nextServerAddr <- ringbuf.Value.(string):
ringbuf = ringbuf.Next()
case err := <-innerWs.WatchCh(ctx):

case err := <-ws.WatchCh(ctx):
if err != nil {
// context was cancelled
// Context was cancelled.
return
}
// watch fired so we refetch the peering and rebuild the ring buffer
if err := fetchAddrs(); err != nil {
s.logger.Warn("watchset for peer was fired but failed to update server addresses",
"peer_id", peer.ID,

// Watch fired so we re-fetch the necessary addresses and replace the ring buffer.
if err := fetchAddresses(); err != nil {
s.logger.Warn("watch for new addresses fired but the address list to dial may not have been updated",
"peer_id", peerID,
"error", err)
}
}
Expand Down

0 comments on commit 24d0c88

Please sign in to comment.