Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Exported Service List Event to Cluster Peering Replication #14797

Merged
merged 1 commit into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14797.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
peering: Ensure un-exported services get deleted even if the un-export happens while cluster peering replication is down.
```
211 changes: 200 additions & 11 deletions agent/consul/leader_peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/armon/go-metrics"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -162,6 +163,186 @@ func testLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T, enableTLS boo
})
}

func TestLeader_PeeringSync_Lifecycle_UnexportWhileDown(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

// Reserve a gRPC port so we can restart the accepting server with the same port.
ports := freeport.GetN(t, 1)
dialingServerPort := ports[0]

_, acceptor := testServerWithConfig(t, func(c *Config) {
c.NodeName = "acceptor"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
})
testrpc.WaitForLeader(t, acceptor.RPC, "dc1")

// Create a peering by generating a token
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err := grpc.DialContext(ctx, acceptor.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptor.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

acceptorClient := pbpeering.NewPeeringServiceClient(conn)

req := pbpeering.GenerateTokenRequest{
PeerName: "my-peer-dialer",
}
resp, err := acceptorClient.GenerateToken(ctx, &req)
require.NoError(t, err)

tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
require.NoError(t, err)

var token structs.PeeringToken
require.NoError(t, json.Unmarshal(tokenJSON, &token))

// Bring up dialer and establish a peering with acceptor's token so that it attempts to dial.
_, dialer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.GRPCPort = dialingServerPort
})
testrpc.WaitForLeader(t, dialer.RPC, "dc1")

// Create a peering at dialer by establishing a peering with acceptor's token
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)

conn, err = grpc.DialContext(ctx, dialer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(dialer.config.RPCAddr.String())),
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

dialerClient := pbpeering.NewPeeringServiceClient(conn)

establishReq := pbpeering.EstablishRequest{
PeerName: "my-peer-acceptor",
PeeringToken: resp.PeeringToken,
}
_, err = dialerClient.Establish(ctx, &establishReq)
require.NoError(t, err)

p, err := dialerClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-acceptor"})
require.NoError(t, err)

retry.Run(t, func(r *retry.R) {
status, found := dialer.peerStreamServer.StreamStatus(p.Peering.ID)
require.True(r, found)
require.True(r, status.Connected)
})

retry.Run(t, func(r *retry.R) {
status, found := acceptor.peerStreamServer.StreamStatus(p.Peering.PeerID)
require.True(r, found)
require.True(r, status.Connected)
})

acceptorCodec := rpcClient(t, acceptor)
{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{
{
Name: "foo",
Consumers: []structs.ServiceConsumer{{PeerName: "my-peer-dialer"}},
},
},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

insertNode := func(i int) {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("node%d", i+1),
Address: fmt.Sprintf("127.0.0.%d", i+1),
NodeMeta: map[string]string{
"group": fmt.Sprintf("%d", i/5),
"instance_type": "t2.micro",
},
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}

var reply struct{}
if err := msgpackrpc.CallWithCodec(acceptorCodec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}

for i := 0; i < 5; i++ {
insertNode(i)
}

retry.Run(t, func(r *retry.R) {
_, nodes, err := dialer.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 5)
})

// Shutdown the dialing server.
require.NoError(t, dialer.Shutdown())

// Have to manually shut down the gRPC server otherwise it stays bound to the port.
dialer.externalGRPCServer.Stop()

{
exportedServices := structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &structs.ExportedServicesConfigEntry{
Name: "default",
Services: []structs.ExportedService{},
},
}
var configOutput bool
require.NoError(t, msgpackrpc.CallWithCodec(acceptorCodec, "ConfigEntry.Apply", &exportedServices, &configOutput))
require.True(t, configOutput)
}

// Restart the server by re-using the previous acceptor's data directory and node id.
_, dialerRestart := testServerWithConfig(t, func(c *Config) {
c.NodeName = "dialer"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.GRPCPort = dialingServerPort
c.DataDir = dialer.config.DataDir
c.NodeID = dialer.config.NodeID
})

// The dialing peer should eventually reconnect.
retry.Run(t, func(r *retry.R) {
connStreams := dialerRestart.peerStreamServer.ConnectedStreams()
require.Contains(r, connStreams, p.Peering.ID)
})

// The un-export results in the foo nodes being deleted.
retry.Run(t, func(r *retry.R) {
_, nodes, err := dialerRestart.fsm.State().CheckServiceNodes(nil, "foo", nil, "my-peer-acceptor")
require.NoError(r, err)
require.Len(r, nodes, 0)
})
}

func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
t.Run("without-tls", func(t *testing.T) {
testLeader_PeeringSync_Lifecycle_AcceptorDeletion(t, false)
Expand Down Expand Up @@ -818,8 +999,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
name string
description string
exportedService structs.ExportedServicesConfigEntry
expectedImportedServsCount uint64
expectedExportedServsCount uint64
expectedImportedServsCount int
expectedExportedServsCount int
}

testCases := []testCase{
Expand Down Expand Up @@ -946,13 +1127,15 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient2.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ImportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ImportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ImportedServices))

// on List
resp2, err2 := peeringClient2.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ImportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ImportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ImportedServices))
})

// Check that exported services count on S1 are what we expect
Expand All @@ -961,13 +1144,15 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
resp, err := peeringClient.PeeringRead(context.Background(), &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
require.NoError(r, err)
require.NotNil(r, resp.Peering)
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ExportedServiceCount)
require.Equal(r, tc.expectedImportedServsCount, int(resp.Peering.ExportedServiceCount))
require.Equal(r, tc.expectedImportedServsCount, len(resp.Peering.ExportedServices))

// on List
resp2, err2 := peeringClient.PeeringList(context.Background(), &pbpeering.PeeringListRequest{})
require.NoError(r, err2)
require.NotEmpty(r, resp2.Peerings)
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ExportedServiceCount)
require.Equal(r, tc.expectedExportedServsCount, int(resp2.Peerings[0].ExportedServiceCount))
require.Equal(r, tc.expectedExportedServsCount, len(resp2.Peerings[0].ExportedServices))
})
})
}
Expand Down Expand Up @@ -1061,17 +1246,21 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.NoError(t, err)

// mimic tracking exported services
mst1.TrackExportedService(structs.ServiceName{Name: "a-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "b-service"})
mst1.TrackExportedService(structs.ServiceName{Name: "c-service"})
mst1.SetExportedServices([]structs.ServiceName{
{Name: "a-service"},
{Name: "b-service"},
{Name: "c-service"},
})

// connect the stream
mst2, err := s2.peeringServer.Tracker.Connected(s2PeerID2)
require.NoError(t, err)

// mimic tracking exported services
mst2.TrackExportedService(structs.ServiceName{Name: "d-service"})
mst2.TrackExportedService(structs.ServiceName{Name: "e-service"})
mst2.SetExportedServices([]structs.ServiceName{
{Name: "d-service"},
{Name: "e-service"},
})

// pretend that the hearbeat happened
mst2.TrackRecvHeartbeat()
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
// We may need to avoid clobbering existing values.
req.Peering.ImportedServiceCount = existing.ImportedServiceCount
req.Peering.ExportedServiceCount = existing.ExportedServiceCount
req.Peering.ImportedServices = existing.ImportedServices
req.Peering.ExportedServices = existing.ExportedServices
req.Peering.CreateIndex = existing.CreateIndex
req.Peering.ModifyIndex = idx
} else {
Expand Down