Skip to content

Commit

Permalink
Merge pull request #269 from red-hat-storage/sync_us--main
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream main for ramen
  • Loading branch information
ShyamsundarR committed May 16, 2024
2 parents aae5e10 + 797c1a8 commit b905207
Show file tree
Hide file tree
Showing 26 changed files with 139 additions and 59 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ test-drpc: generate manifests envtest ## Run DRPlacementControl tests.
test-drcluster: generate manifests envtest ## Run DRCluster tests.
go test ./controllers -coverprofile cover.out -ginkgo.focus DRClusterController

test-drpolicy: generate manifests envtest ## Run DRPolicy tests.
go test ./controllers -coverprofile cover.out -ginkgo.focus DRPolicyController

test-util: generate manifests envtest ## Run util tests.
go test ./controllers/util -coverprofile cover.out

Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/drplacementcontrol_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const (
ProgressionUpdatingPlRule = ProgressionStatus("UpdatingPlRule")
ProgressionWaitForReadiness = ProgressionStatus("WaitForReadiness")
ProgressionCleaningUp = ProgressionStatus("Cleaning Up")
ProgressionWaitOnUserToCleanUp = ProgressionStatus("WaitOnUserToCleanUp")
ProgressionCheckingFailoverPrequisites = ProgressionStatus("CheckingFailoverPrequisites")
ProgressionFailingOverToCluster = ProgressionStatus("FailingOverToCluster")
ProgressionWaitForFencing = ProgressionStatus("WaitForFencing")
Expand Down
2 changes: 2 additions & 0 deletions config/dr-cluster/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ transformers:
path: metadata/labels
- kind: Service
path: spec/selector
- kind: ConfigMap
path: metadata/labels
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
Expand Down
10 changes: 10 additions & 0 deletions config/hub/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ rules:
verbs:
- list
- watch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- create
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
10 changes: 10 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ rules:
verbs:
- list
- watch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- create
- get
- list
- update
- watch
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions controllers/drclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func objectsToDeploy(hubOperatorRamenConfig *rmn.RamenConfig) ([]interface{}, er
return nil, err
}

if drClusterOperatorRamenConfig.RamenOpsNamespace != "" {
objects = append(objects,
util.Namespace(drClusterOperatorRamenConfig.RamenOpsNamespace),
)
}

return append(objects,
util.Namespace(drClusterOperatorNamespaceName),
olmClusterRole,
Expand Down
11 changes: 11 additions & 0 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,11 @@ func (d *DRPCInstance) ensureVRGManifestWorkOnClusterDeleted(clusterName string)
}

d.log.Info("Request not complete yet", "cluster", clusterName)

if d.instance.Spec.ProtectedNamespaces != nil && len(*d.instance.Spec.ProtectedNamespaces) > 0 {
d.setProgression(rmn.ProgressionWaitOnUserToCleanUp)
}

// IF we get here, either the VRG has not transitioned to secondary (yet) or delete didn't succeed. In either cases,
// we need to make sure that the VRG object is deleted. IOW, we still have to wait
return !done, nil
Expand All @@ -1880,6 +1885,10 @@ func (d *DRPCInstance) ensureVRGIsSecondaryEverywhere(clusterToSkip string) bool
continue
}

if d.instance.Spec.ProtectedNamespaces != nil && len(*d.instance.Spec.ProtectedNamespaces) > 0 {
d.setProgression(rmn.ProgressionWaitOnUserToCleanUp)
}

if !d.ensureVRGIsSecondaryOnCluster(clusterName) {
d.log.Info("Still waiting for VRG to transition to secondary", "cluster", clusterName)

Expand Down Expand Up @@ -2199,6 +2208,7 @@ failoverProgressions are used to indicate progression during failover action pro
ProgressionUpdatedPlacement,
ProgressionCompleted,
ProgressionCleaningUp,
ProgressionWaitOnUserToCleanUp,
}
relocateProgressions are used to indicate progression during relocate action processing
Expand All @@ -2211,6 +2221,7 @@ relocateProgressions are used to indicate progression during relocate action pro
rmn.ProgressionRunningFinalSync,
rmn.ProgressionFinalSyncComplete,
rmn.ProgressionEnsuringVolumesAreSecondary,
rmn.ProgressionWaitOnUserToCleanUp,
}
postRelocateProgressions := {
Expand Down
18 changes: 3 additions & 15 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,7 @@ func DRPCsUsingDRCluster(k8sclient client.Client, log logr.Logger, drcluster *rm
for i := range drpolicies.Items {
drpolicy := &drpolicies.Items[i]

for _, clusterName := range drpolicy.Spec.DRClusters {
if clusterName != drcluster.GetName() {
continue
}

if rmnutil.DrpolicyContainsDrcluster(drpolicy, drcluster.GetName()) {
log.Info("Found DRPolicy referencing DRCluster", "drpolicy", drpolicy.GetName())

drpcs, err := DRPCsUsingDRPolicy(k8sclient, log, drpolicy)
Expand All @@ -418,8 +414,6 @@ func DRPCsUsingDRCluster(k8sclient client.Client, log logr.Logger, drcluster *rm
for _, drpc := range drpcs {
found = append(found, DRPCAndPolicy{drpc: drpc, drPolicy: drpolicy})
}

break
}
}

Expand Down Expand Up @@ -477,11 +471,7 @@ func DRPCsFailingOverToCluster(k8sclient client.Client, log logr.Logger, drclust
for drpolicyIdx := range drpolicies.Items {
drpolicy := &drpolicies.Items[drpolicyIdx]

for _, specCluster := range drpolicy.Spec.DRClusters {
if specCluster != drcluster {
continue
}

if rmnutil.DrpolicyContainsDrcluster(drpolicy, drcluster) {
drClusters, err := getDRClusters(context.TODO(), k8sclient, drpolicy)
if err != nil || len(drClusters) <= 1 {
log.Error(err, "Failed to get DRClusters")
Expand Down Expand Up @@ -511,8 +501,6 @@ func DRPCsFailingOverToCluster(k8sclient client.Client, log logr.Logger, drclust

drpcCollections = append(drpcCollections, dprcCollection)
}

break
}
}

Expand Down Expand Up @@ -2290,7 +2278,7 @@ func addOrUpdateCondition(conditions *[]metav1.Condition, conditionType string,

// Initial creation of the DRPC status condition. This will also preserve the ordering of conditions in the array
func ensureDRPCConditionsInited(conditions *[]metav1.Condition, observedGeneration int64, message string) {
const DRPCTotalConditions = 2
const DRPCTotalConditions = 3
if len(*conditions) == DRPCTotalConditions {
return
}
Expand Down
9 changes: 1 addition & 8 deletions controllers/drpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,9 @@ func drClusterListMustHaveS3Profiles(drpolicies rmn.DRPolicyList,
continue
}

for _, cluster := range util.DRPolicyClusterNames(&drpolicies.Items[idx]) {
// Skip if not the current cluster
if cluster != clusterName {
continue
}

if util.DrpolicyContainsDrcluster(&drpolicies.Items[idx], clusterName) {
// Add all S3Profiles across clusters in this policy to the current cluster
mustHaveS3Profiles = mustHaveS3Profiles.Union(util.DRPolicyS3Profiles(&drpolicies.Items[idx], drclusters.Items))

break
}
}

Expand Down
27 changes: 15 additions & 12 deletions controllers/drpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const ReasonDRClustersUnavailable = "DRClustersUnavailable"
// +kubebuilder:rbac:groups=work.open-cluster-management.io,resources=manifestworks,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=list;watch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=list;watch
// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create;update
// +kubebuilder:rbac:groups="policy.open-cluster-management.io",resources=placementbindings,verbs=list;watch
// +kubebuilder:rbac:groups="policy.open-cluster-management.io",resources=policies,verbs=list;watch
// +kubebuilder:rbac:groups="",namespace=system,resources=secrets,verbs=get;update
Expand All @@ -71,7 +72,7 @@ const ReasonDRClustersUnavailable = "DRClustersUnavailable"
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.9.2/pkg/reconcile
//
//nolint:cyclop
//nolint:cyclop,funlen
func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("DRPolicy", req.NamespacedName.Name, "rid", uuid.New())
log.Info("reconcile enter")
Expand All @@ -90,6 +91,11 @@ func (r *DRPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, fmt.Errorf("config map get: %w", u.validatedSetFalse("ConfigMapGetFailed", err))
}

if err := util.CreateRamenOpsNamespace(ctx, r.Client, ramenConfig); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create RamenOpsNamespace: %w",
u.validatedSetFalse("NamespaceCreateFailed", err))
}

drclusters := &ramen.DRClusterList{}

if err := r.Client.List(ctx, drclusters); err != nil {
Expand Down Expand Up @@ -484,18 +490,15 @@ func (r *DRPolicyReconciler) drClusterMapFunc(ctx context.Context, drcluster cli

requests := make([]reconcile.Request, 0)

for _, drpolicy := range drpolicies.Items {
for _, specCluster := range drpolicy.Spec.DRClusters {
if specCluster == drcluster.GetName() {
add := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: drpolicy.GetName(),
},
}
requests = append(requests, add)

break
for idx := range drpolicies.Items {
drpolicy := &drpolicies.Items[idx]
if util.DrpolicyContainsDrcluster(drpolicy, drcluster.GetName()) {
add := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: drpolicy.GetName(),
},
}
requests = append(requests, add)
}
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/drpolicy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("DrpolicyController", func() {
var _ = Describe("DRPolicyController", func() {
validatedConditionExpect := func(drpolicy *ramen.DRPolicy, status metav1.ConditionStatus,
messageMatcher gomegaTypes.GomegaMatcher,
) {
Expand Down
10 changes: 10 additions & 0 deletions controllers/util/drpolicy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,13 @@ func GetSecondsFromSchedulingInterval(drpolicy *rmn.DRPolicy) (float64, error) {
return s.Seconds(), err
}
}

func DrpolicyContainsDrcluster(drpolicy *rmn.DRPolicy, drcluster string) bool {
for _, managedCluster := range DRPolicyClusterNames(drpolicy) {
if managedCluster == drcluster {
return true
}
}

return false
}
32 changes: 32 additions & 0 deletions controllers/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import (
"context"
"reflect"

rmn "github.com/ramendr/ramen/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
Expand Down Expand Up @@ -180,3 +184,31 @@ func UpdateStringMap(dst *map[string]string, src map[string]string) {
func OptionalEqual(a, b string) bool {
return a == "" || b == "" || a == b
}

func CreateRamenOpsNamespace(ctx context.Context, k8sClient client.Client, ramenconfig *rmn.RamenConfig) error {
if ramenconfig.RamenOpsNamespace == "" {
return nil
}

return CreateNamespaceIfNotExists(ctx, k8sClient, ramenconfig.RamenOpsNamespace)
}

func CreateNamespaceIfNotExists(ctx context.Context, k8sClient client.Client, namespace string) error {
ns := &corev1.Namespace{}

err := k8sClient.Get(ctx, types.NamespacedName{Name: namespace}, ns)
if err != nil {
if errors.IsNotFound(err) {
ns.Name = namespace

err = k8sClient.Create(ctx, ns)
if err != nil {
return err
}
} else {
return err
}
}

return nil
}
2 changes: 1 addition & 1 deletion test/addons/cdi/dv/dv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spec:
storage:
accessModes:
- ReadWriteMany
storageClassName: csi-hostpath-sc
storageClassName: rook-ceph-block
resources:
requests:
storage: 128Mi
Expand Down
2 changes: 1 addition & 1 deletion test/addons/cdi/pvc/pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ spec:
resources:
requests:
storage: 128Mi
storageClassName: csi-hostpath-sc
storageClassName: rook-ceph-block
volumeMode: Block
4 changes: 2 additions & 2 deletions test/addons/volsync/destination/replication-dst.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ spec:
copyMethod: Snapshot
capacity: 1Gi
accessModes: [ReadWriteOnce]
storageClassName: csi-hostpath-sc
volumeSnapshotClassName: csi-hostpath-snapclass
storageClassName: rook-cephfs
volumeSnapshotClassName: csi-cephfsplugin-snapclass
moverSecurityContext:
runAsUser: 10000
runAsGroup: 10000
Expand Down
2 changes: 1 addition & 1 deletion test/addons/volsync/source/pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ metadata:
appname: busybox
spec:
accessModes: [ReadWriteOnce]
storageClassName: csi-hostpath-sc
storageClassName: rook-cephfs
resources:
requests:
storage: 1Gi
2 changes: 1 addition & 1 deletion test/addons/volsync/source/replication-src.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
keySecret: volsync-rsync-tls-busybox-dst
address: volsync-rsync-tls-dst-busybox-dst.busybox.svc.clusterset.local
copyMethod: Snapshot
volumeSnapshotClassName: csi-hostpath-snapclass
volumeSnapshotClassName: csi-cephfsplugin-snapclass
moverSecurityContext:
runAsUser: 10000
runAsGroup: 10000
Expand Down
2 changes: 1 addition & 1 deletion test/drenv/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def fetch(kustomization_dir, dest, log=print):
log(f"Fetching {dest}")
dest_dir = os.path.dirname(dest)
os.makedirs(dest_dir, exist_ok=True)
tmp = dest + ".tmp"
tmp = dest + f".tmp.{os.getpid()}"
try:
_build_kustomization(kustomization_dir, tmp)
os.rename(tmp, dest)
Expand Down
5 changes: 4 additions & 1 deletion test/drenv/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,17 @@ def stream(proc, input=None, bufsize=32 << 10, timeout=None):
except BrokenPipeError:
pass

# Use only if input is not None, but it helps pylint.
input_view = ""
input_offset = 0

with _Selector() as sel:
for f, src in (proc.stdout, OUT), (proc.stderr, ERR):
if f and not f.closed:
sel.register(f, selectors.EVENT_READ, src)
if input:
sel.register(proc.stdin, selectors.EVENT_WRITE)
input_view = memoryview(input.encode())
input_offset = 0

while sel.get_map():
remaining = _remaining_time(deadline)
Expand Down
12 changes: 8 additions & 4 deletions test/envs/kubevirt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ profiles:
network: $network
cpus: 4
memory: "8g"
addons:
- csi-hostpath-driver
extra_disks: 1
disk_size: "50g"
workers:
- addons:
- name: kubevirt
- addons:
- name: rook-operator
- name: rook-cluster
- name: rook-toolbox
- name: rook-pool
- name: cdi
- addons:
- name: kubevirt

0 comments on commit b905207

Please sign in to comment.