Skip to content

Commit

Permalink
Add exported services event to cluster peering replication.
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn committed Sep 29, 2022
1 parent 69f40df commit f014d72
Show file tree
Hide file tree
Showing 28 changed files with 1,264 additions and 642 deletions.
3 changes: 3 additions & 0 deletions .changelog/14797.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
peering: Ensure un-exported services get deleted even if the un-export happens while cluster peering replication is down.
```
200 changes: 195 additions & 5 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/armon/go-metrics"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -162,6 +163,191 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
})
}

func TestLeader_PeeringSync_Lifecycle_UnexportWhileDown(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

// Reserve a gRPC port so we can restart the accepting server with the same port.
ports := freeport.GetN(t, 1)
dialingServerPort := ports[0]

_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")

// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

acceptorClient := pbpeering.NewPeeringServiceClient(conn)

req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)

tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)

var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))

// S1 should not have a stream tracked for dc2 because acceptor generated a token for baz, and therefore needs to wait to be dialed.
time.Sleep(1 * time.Second)
_, found := acceptor.peerStreamServer.StreamStatus(token.PeerID)
require.False(t, found)

// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.GRPCPort = dialingServerPort
})
testrpc.WaitForLeader(t, dialer.RPC, "dc1")

// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

dialerClient := pbpeering.NewPeeringServiceClient(conn)

establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)

p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)

retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})

retry.Run(t, func(r *retry.R) {
status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.True(r, found)
require.True(r, status.Connected)
})

acceptorCodec := rpcClient(t, acceptor)
{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{{PeerName: "my-peer-dialer"}},
},
},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

insertNode := func(i int) {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("node%d", i+1),
Address: fmt.Sprintf("127.0.0.%d", i+1),
NodeMeta: map[string]string{
"group": fmt.Sprintf("%d", i/5),
"instance_type": "t2.micro",
},
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}

var reply struct{}
if err := msgpackrpc.CallWithCodec(acceptorCodec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}

for i := 0; i < 5; i++ {
insertNode(i)
}

retry.Run(t, func(r *retry.R) {
_, nodes, err := dialer.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 5)
})

// Shutdown the accepting server.
require.NoError(t, dialer.Shutdown())

// Have to manually shut down the gRPC server otherwise it stays bound to the port.
dialer.externalGRPCServer.Stop()

{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

// Restart the server by re-using the previous acceptor's data directory and node id.
_, dialerRestart := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCPort = dialingServerPort
c.DataDir = dialer.config.DataDir
c.NodeID = dialer.config.NodeID
})

// The dialing peer should eventually reconnect.
retry.Run(t, func(r *retry.R) {
connStreams := dialerRestart.peerStreamServer.ConnectedStreams()
require.Contains(r, connStreams, p.Peering.ID)
})

// The un-export results in the foo nodes being deleted.
retry.Run(t, func(r *retry.R) {
_, nodes, err := dialerRestart.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 0)
})
}

func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false)
Expand Down Expand Up @@ -1061,17 +1247,21 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.NoError(t, err)

// mimic tracking exported services
mst1.TrackExportedService(structs.ServiceName{Name: "a-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "b-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "c-service"})
mst1.SetExportedServices([]structs.ServiceName{
{Name: "a-service"},
{Name: "b-service"},
{Name: "c-service"},
})

// connect the stream
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
require.NoError(t, err)

// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
mst2.SetExportedServices([]structs.ServiceName{
{Name: "d-service"},
{Name: "e-service"},
})

// pretend that the hearbeat happened
mst2.TrackRecvHeartbeat()
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
req.Peering.ImportedServices = existing.ImportedServices
req.Peering.ExportedServices = existing.ExportedServices
req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx
} else {
Expand Down

0 comments on commit f014d72

Please sign in to comment.