-
Notifications
You must be signed in to change notification settings - Fork 41
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1266 from machi1990/refactor/create-cleanup-and-d…
…eprovisioning-reconcilers-for-clusters refactor(cluster-reconciler): create a cleanup and deprovisioning clusters reconcilers
- Loading branch information
Showing
14 changed files
with
799 additions
and
512 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
30 changes: 30 additions & 0 deletions
30
...grations/20220829180000_add_cleanup_cluster_external_resources_worker_to_leader_leases.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package migrations | ||
|
||
import ( | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" | ||
"github.com/go-gormigrate/gormigrate/v2" | ||
"gorm.io/gorm" | ||
) | ||
|
||
func addCleanupClusterExternalResourcesWorkerToLeaderLeases() *gormigrate.Migration { | ||
cleanupClustersWorkerType := "cleanup_clusters" | ||
|
||
return &gormigrate.Migration{ | ||
ID: "20220829180000", | ||
Migrate: func(tx *gorm.DB) error { | ||
if err := tx.Create(&api.LeaderLease{Expires: &db.KafkaAdditionalLeasesExpireTime, LeaseType: cleanupClustersWorkerType, Leader: api.NewID()}).Error; err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
}, | ||
Rollback: func(tx *gorm.DB) error { | ||
err := tx.Unscoped().Where("lease_type = ?", cleanupClustersWorkerType).Delete(&api.LeaderLease{}).Error | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
}, | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
.../internal/migrations/20220829190000_add_deprovisioning_cluster_worker_to_leader_leases.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package migrations | ||
|
||
import ( | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/db" | ||
"github.com/go-gormigrate/gormigrate/v2" | ||
"gorm.io/gorm" | ||
) | ||
|
||
func addDeprovisioningClusterWorkerToLeaderLeases() *gormigrate.Migration { | ||
const deprovisioningClustersWorkerType = "deprovisioning_clusters" | ||
|
||
return &gormigrate.Migration{ | ||
ID: "20220829190000", | ||
Migrate: func(tx *gorm.DB) error { | ||
if err := tx.Create(&api.LeaderLease{Expires: &db.KafkaAdditionalLeasesExpireTime, LeaseType: deprovisioningClustersWorkerType, Leader: api.NewID()}).Error; err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
}, | ||
Rollback: func(tx *gorm.DB) error { | ||
err := tx.Unscoped().Where("lease_type = ?", deprovisioningClustersWorkerType).Delete(&api.LeaderLease{}).Error | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
internal/kafka/internal/workers/cluster_mgrs/cleanup_clusters_mgr.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package cluster_mgrs | ||
|
||
import ( | ||
fleeterrors "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" | ||
|
||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/services" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/workers" | ||
"github.com/google/uuid" | ||
|
||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/metrics" | ||
"github.com/golang/glog" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
const ( | ||
cleanupClustersWorkerType = "cleanup_clusters" | ||
) | ||
|
||
// CleanupClustersManager represents a worker that periodically reconciles data plane clusters in cleanup state. | ||
type CleanupClustersManager struct { | ||
workers.BaseWorker | ||
clusterService services.ClusterService | ||
kasFleetshardOperatorAddon services.KasFleetshardOperatorAddon | ||
osdIDPKeycloakService sso.OsdKeycloakService | ||
} | ||
|
||
// NewCleanupClustersManager creates a new worker that reconciles data plane clusters in cleanup state. | ||
func NewCleanupClustersManager(reconciler workers.Reconciler, | ||
clusterService services.ClusterService, | ||
kasFleetshardOperatorAddon services.KasFleetshardOperatorAddon, | ||
osdIDPKeycloakService sso.OsdKeycloakService) *CleanupClustersManager { | ||
return &CleanupClustersManager{ | ||
BaseWorker: workers.BaseWorker{ | ||
Id: uuid.New().String(), | ||
WorkerType: cleanupClustersWorkerType, | ||
Reconciler: reconciler, | ||
}, | ||
clusterService: clusterService, | ||
kasFleetshardOperatorAddon: kasFleetshardOperatorAddon, | ||
osdIDPKeycloakService: osdIDPKeycloakService, | ||
} | ||
} | ||
|
||
// Start initializes the worker to reconcile data plane clusters in cleanup state. | ||
func (m *CleanupClustersManager) Start() { | ||
m.StartWorker(m) | ||
} | ||
|
||
// Stop causes the process for reconciling data plane clusters in cleanup state to stop. | ||
func (m *CleanupClustersManager) Stop() { | ||
m.StopWorker(m) | ||
} | ||
|
||
func (m *CleanupClustersManager) Reconcile() []error { | ||
glog.Infoln("reconciling clusters") | ||
|
||
var errList fleeterrors.ErrorList | ||
err := m.processCleanupClusters() | ||
if err != nil { | ||
errList.AddErrors(err) | ||
} | ||
|
||
return errList.ToErrorSlice() | ||
} | ||
|
||
func (m *CleanupClustersManager) processCleanupClusters() error { | ||
var errList fleeterrors.ErrorList | ||
|
||
cleanupClusters, serviceErr := m.clusterService.ListByStatus(api.ClusterCleanup) | ||
if serviceErr != nil { | ||
errList.AddErrors(errors.Wrap(serviceErr, "failed to list of cleaup clusters")) | ||
return errList | ||
} | ||
|
||
glog.Infof("cleanup clusters count = %d", len(cleanupClusters)) | ||
|
||
for _, cluster := range cleanupClusters { | ||
glog.V(10).Infof("cleanup cluster ClusterID = %s", cluster.ClusterID) | ||
metrics.UpdateClusterStatusSinceCreatedMetric(cluster, api.ClusterCleanup) | ||
if err := m.reconcileCleanupCluster(cluster); err != nil { | ||
errList.AddErrors(errors.Wrapf(err, "failed to reconcile cleanup cluster %s", cluster.ClusterID)) | ||
} | ||
} | ||
|
||
if errList.IsEmpty() { | ||
return nil | ||
} | ||
|
||
return errList | ||
} | ||
|
||
func (m *CleanupClustersManager) reconcileCleanupCluster(cluster api.Cluster) error { | ||
glog.Infof("Removing Dataplane cluster %s IDP client", cluster.ClusterID) | ||
keycloakDeregistrationErr := m.osdIDPKeycloakService.DeRegisterClientInSSO(cluster.ID) | ||
if keycloakDeregistrationErr != nil { | ||
return errors.Wrapf(keycloakDeregistrationErr, "failed to removed Dataplance cluster %s IDP client", cluster.ClusterID) | ||
} | ||
glog.Infof("Removing Dataplane cluster %s fleetshard service account", cluster.ClusterID) | ||
serviceAcountRemovalErr := m.kasFleetshardOperatorAddon.RemoveServiceAccount(cluster) | ||
if serviceAcountRemovalErr != nil { | ||
return errors.Wrapf(serviceAcountRemovalErr, "failed to removed Dataplance cluster %s fleetshard service account", cluster.ClusterID) | ||
} | ||
|
||
glog.Infof("Soft deleting the Dataplane cluster %s from the database", cluster.ClusterID) | ||
deleteError := m.clusterService.DeleteByClusterID(cluster.ClusterID) | ||
if deleteError != nil { | ||
return errors.Wrapf(deleteError, "failed to soft delete Dataplance cluster %s from the database", cluster.ClusterID) | ||
} | ||
return nil | ||
} |
202 changes: 202 additions & 0 deletions
202
internal/kafka/internal/workers/cluster_mgrs/cleanup_clusters_mgr_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
package cluster_mgrs | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/kafka/internal/services" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/api" | ||
"github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/services/sso" | ||
"github.com/onsi/gomega" | ||
|
||
apiErrors "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" | ||
) | ||
|
||
func TestCleanupClustersManager_processCleanupClusters(t *testing.T) { | ||
deprovisionCluster := api.Cluster{ | ||
Status: api.ClusterDeprovisioning, | ||
} | ||
type fields struct { | ||
clusterService services.ClusterService | ||
osdIDPKeycloakService sso.OSDKeycloakService | ||
kasFleetshardOperatorAddon services.KasFleetshardOperatorAddon | ||
} | ||
tests := []struct { | ||
name string | ||
fields fields | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "should return an error if ListByStatus fails in ClusterService", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
ListByStatusFunc: func(api.ClusterStatus) ([]api.Cluster, *apiErrors.ServiceError) { | ||
return nil, apiErrors.GeneralError("failed to list by status") | ||
}, | ||
}, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "should return an error if reconcileCleanupCluster fails during processing cleaned up clusters", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
ListByStatusFunc: func(api.ClusterStatus) ([]api.Cluster, *apiErrors.ServiceError) { | ||
return []api.Cluster{ | ||
deprovisionCluster, | ||
}, nil | ||
}, | ||
DeleteByClusterIDFunc: func(clusterID string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
osdIDPKeycloakService: &sso.OSDKeycloakServiceMock{ | ||
DeRegisterClientInSSOFunc: func(kafkaNamespace string) *apiErrors.ServiceError { | ||
return apiErrors.GeneralError("failed to deregister client in sso") | ||
}, | ||
}, | ||
kasFleetshardOperatorAddon: &services.KasFleetshardOperatorAddonMock{ | ||
RemoveServiceAccountFunc: func(cluster api.Cluster) *apiErrors.ServiceError { | ||
return apiErrors.GeneralError("failed to remove service account client in sso") | ||
}, | ||
}, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "should succeed if no errors are encountered", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
ListByStatusFunc: func(api.ClusterStatus) ([]api.Cluster, *apiErrors.ServiceError) { | ||
return []api.Cluster{ | ||
deprovisionCluster, | ||
}, nil | ||
}, | ||
DeleteByClusterIDFunc: func(clusterID string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
osdIDPKeycloakService: &sso.OSDKeycloakServiceMock{ | ||
DeRegisterClientInSSOFunc: func(kafkaNamespace string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
kasFleetshardOperatorAddon: &services.KasFleetshardOperatorAddonMock{ | ||
RemoveServiceAccountFunc: func(cluster api.Cluster) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
}, | ||
wantErr: false, | ||
}, | ||
} | ||
|
||
for _, testcase := range tests { | ||
tt := testcase | ||
t.Run(tt.name, func(t *testing.T) { | ||
g := gomega.NewWithT(t) | ||
c := &CleanupClustersManager{ | ||
clusterService: tt.fields.clusterService, | ||
osdIDPKeycloakService: tt.fields.osdIDPKeycloakService, | ||
kasFleetshardOperatorAddon: tt.fields.kasFleetshardOperatorAddon, | ||
} | ||
|
||
err := c.processCleanupClusters() | ||
if !tt.wantErr { | ||
g.Expect(err).ToNot(gomega.HaveOccurred()) | ||
} else { | ||
g.Expect(err).To(gomega.HaveOccurred()) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestCleanupClustersManager_reconcileCleanupCluster(t *testing.T) { | ||
type fields struct { | ||
clusterService services.ClusterService | ||
osdIDPKeycloakService sso.OSDKeycloakService | ||
kasFleetshardOperatorAddon services.KasFleetshardOperatorAddon | ||
} | ||
tests := []struct { | ||
name string | ||
fields fields | ||
arg api.Cluster | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "receives an error when remove kas-fleetshard-operator service account fails", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
UpdateStatusFunc: func(cluster api.Cluster, status api.ClusterStatus) error { | ||
return nil | ||
}, | ||
}, | ||
osdIDPKeycloakService: &sso.OSDKeycloakServiceMock{ | ||
DeRegisterClientInSSOFunc: func(kafkaNamespace string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
kasFleetshardOperatorAddon: &services.KasFleetshardOperatorAddonMock{ | ||
RemoveServiceAccountFunc: func(cluster api.Cluster) *apiErrors.ServiceError { | ||
return &apiErrors.ServiceError{} | ||
}, | ||
}, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "receives an error when soft delete cluster from database fails", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
DeleteByClusterIDFunc: func(clusterID string) *apiErrors.ServiceError { | ||
return &apiErrors.ServiceError{} | ||
}, | ||
}, | ||
osdIDPKeycloakService: &sso.OSDKeycloakServiceMock{ | ||
DeRegisterClientInSSOFunc: func(kafkaNamespace string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
kasFleetshardOperatorAddon: &services.KasFleetshardOperatorAddonMock{ | ||
RemoveServiceAccountFunc: func(cluster api.Cluster) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "successful deletion of an OSD cluster", | ||
fields: fields{ | ||
clusterService: &services.ClusterServiceMock{ | ||
DeleteByClusterIDFunc: func(clusterID string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
osdIDPKeycloakService: &sso.OSDKeycloakServiceMock{ | ||
DeRegisterClientInSSOFunc: func(kafkaNamespace string) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
kasFleetshardOperatorAddon: &services.KasFleetshardOperatorAddonMock{ | ||
RemoveServiceAccountFunc: func(cluster api.Cluster) *apiErrors.ServiceError { | ||
return nil | ||
}, | ||
}, | ||
}, | ||
wantErr: false, | ||
}, | ||
} | ||
|
||
for _, testcase := range tests { | ||
tt := testcase | ||
t.Run(tt.name, func(t *testing.T) { | ||
g := gomega.NewWithT(t) | ||
c := &CleanupClustersManager{ | ||
clusterService: tt.fields.clusterService, | ||
osdIDPKeycloakService: tt.fields.osdIDPKeycloakService, | ||
kasFleetshardOperatorAddon: tt.fields.kasFleetshardOperatorAddon, | ||
} | ||
g.Expect(c.reconcileCleanupCluster(tt.arg) != nil).To(gomega.Equal(tt.wantErr)) | ||
}) | ||
} | ||
} |
Oops, something went wrong.