Skip to content

Commit

Permalink
Add exported services event to cluster peering replication. (#14797)
Browse files Browse the repository at this point in the history
  • Loading branch information
erichaberkorn committed Sep 29, 2022
1 parent fdd6e9d commit 80e51ff
Show file tree
Hide file tree
Showing 28 changed files with 1,257 additions and 723 deletions.
3 changes: 3 additions & 0 deletions .changelog/14797.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
peering: Ensure un-exported services get deleted even if the un-export happens while cluster peering replication is down.
```
211 changes: 200 additions & 11 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/armon/go-metrics"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -162,6 +163,186 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
})
}

func TestLeader_PeeringSync_Lifecycle_UnexportWhileDown(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

// Reserve a gRPC port so we can restart the accepting server with the same port.
ports := freeport.GetN(t, 1)
dialingServerPort := ports[0]

_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")

// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

acceptorClient := pbpeering.NewPeeringServiceClient(conn)

req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)

tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)

var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))

// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.GRPCPort = dialingServerPort
})
testrpc.WaitForLeader(t, dialer.RPC, "dc1")

// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

dialerClient := pbpeering.NewPeeringServiceClient(conn)

establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)

p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)

retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})

retry.Run(t, func(r *retry.R) {
status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.True(r, found)
require.True(r, status.Connected)
})

acceptorCodec := rpcClient(t, acceptor)
{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{{PeerName: "my-peer-dialer"}},
},
},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

insertNode := func(i int) {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("node%d", i+1),
Address: fmt.Sprintf("127.0.0.%d", i+1),
NodeMeta: map[string]string{
"group": fmt.Sprintf("%d", i/5),
"instance_type": "t2.micro",
},
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}

var reply struct{}
if err := msgpackrpc.CallWithCodec(acceptorCodec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}

for i := 0; i < 5; i++ {
insertNode(i)
}

retry.Run(t, func(r *retry.R) {
_, nodes, err := dialer.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 5)
})

// Shutdown the dialing server.
require.NoError(t, dialer.Shutdown())

// Have to manually shut down the gRPC server otherwise it stays bound to the port.
dialer.externalGRPCServer.Stop()

{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

// Restart the server by re-using the previous acceptor's data directory and node id.
_, dialerRestart := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCPort = dialingServerPort
c.DataDir = dialer.config.DataDir
c.NodeID = dialer.config.NodeID
})

// The dialing peer should eventually reconnect.
retry.Run(t, func(r *retry.R) {
connStreams := dialerRestart.peerStreamServer.ConnectedStreams()
require.Contains(r, connStreams, p.Peering.ID)
})

// The un-export results in the foo nodes being deleted.
retry.Run(t, func(r *retry.R) {
_, nodes, err := dialerRestart.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 0)
})
}

func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false)
Expand Down Expand Up @@ -818,8 +999,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
name string
description string
exportedService structs.ExportedServicesConfigEntry
expectedImportedServsCount uint64
expectedExportedServsCount uint64
expectedImportedServsCount int
expectedExportedServsCount int
}

testCases := []testCase{
Expand Down Expand Up @@ -946,13 +1127,15 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ImportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ImportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ImportedServices))

// on List
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ImportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ImportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ImportedServices))
})

// Check that exported services count on S1 are what we expect
Expand All @@ -961,13 +1144,15 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ExportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ExportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ExportedServices))

// on List
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ExportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ExportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ExportedServices))
})
})
}
Expand Down Expand Up @@ -1061,17 +1246,21 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.NoError(t, err)

// mimic tracking exported services
mst1.TrackExportedService(structs.ServiceName{Name: "a-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "b-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "c-service"})
mst1.SetExportedServices([]structs.ServiceName{
{Name: "a-service"},
{Name: "b-service"},
{Name: "c-service"},
})

// connect the stream
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
require.NoError(t, err)

// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
mst2.SetExportedServices([]structs.ServiceName{
{Name: "d-service"},
{Name: "e-service"},
})

// pretend that the hearbeat happened
mst2.TrackRecvHeartbeat()
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
req.Peering.ImportedServices = existing.ImportedServices
req.Peering.ExportedServices = existing.ExportedServices
req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx
} else {
Expand Down

0 comments on commit 80e51ff

Please sign in to comment.