Skip to content

Commit

Permalink
customizing secret event sources (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: liuwei <liuweixa@redhat.com>
  • Loading branch information
skeeey committed Oct 14, 2021
1 parent d1d7bb5 commit 2fb6f7e
Show file tree
Hide file tree
Showing 26 changed files with 534 additions and 300 deletions.
58 changes: 51 additions & 7 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ package main
import (
"fmt"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/open-cluster-management/managedcluster-import-controller/pkg/constants"
"github.com/open-cluster-management/managedcluster-import-controller/pkg/controller"
"github.com/open-cluster-management/managedcluster-import-controller/pkg/helpers"
imgregistryv1alpha1 "github.com/open-cluster-management/multicloud-operators-foundation/pkg/apis/imageregistry/v1alpha1"
Expand All @@ -28,10 +30,14 @@ import (
hivev1 "github.com/openshift/hive/apis/hive/v1"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
informerscorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/logs"

asv1beta1 "github.com/openshift/assisted-service/api/v1beta1"
Expand Down Expand Up @@ -64,6 +70,8 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

ctx := ctrl.SetupSignalHandler()

// Get a config to talk to the kube-apiserver
cfg, err := ctrl.GetConfig()
if err != nil {
Expand All @@ -89,6 +97,34 @@ func main() {
os.Exit(1)
}

importSecretInformer := informerscorev1.NewFilteredSecretInformer(
kubeClient,
metav1.NamespaceAll,
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: constants.ClusterImportSecretLabel,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
},
)

autoimportSecretInformer := informerscorev1.NewFilteredSecretInformer(
kubeClient,
metav1.NamespaceAll,
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(listOptions *metav1.ListOptions) {
listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", constants.AutoImportSecretName).String()
},
)

// Create controller-runtime manager
mgr, err := ctrl.NewManager(cfg, manager.Options{
Scheme: scheme,
Expand All @@ -102,18 +138,26 @@ func main() {
}

setupLog.Info("Registering Controllers")
if err := controller.AddToManager(mgr, &helpers.ClientHolder{
KubeClient: kubeClient,
APIExtensionsClient: apiExtensionsClient,
OperatorClient: operatorClient,
RuntimeClient: mgr.GetClient(),
}); err != nil {
if err := controller.AddToManager(
mgr,
&helpers.ClientHolder{
KubeClient: kubeClient,
APIExtensionsClient: apiExtensionsClient,
OperatorClient: operatorClient,
RuntimeClient: mgr.GetClient(),
},
importSecretInformer,
autoimportSecretInformer,
); err != nil {
setupLog.Error(err, "failed to register controller")
os.Exit(1)
}

go importSecretInformer.Run(ctx.Done())
go autoimportSecretInformer.Run(ctx.Done())

setupLog.Info("Starting Controller Manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "failed to start manager")
os.Exit(1)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const ImportFinalizer string = "managedcluster-import-controller.open-cluster-ma

const SelfManagedLabel string = "local-cluster"

const ClusterImportSecretLabel = "managedcluster-import-controller.open-cluster-management.io/import-secret"

const (
CreatedViaAnnotation = "open-cluster-management/created-via"
CreatedViaAI = "assisted-installer"
Expand Down
26 changes: 13 additions & 13 deletions pkg/controller/autoimport/autoimport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"

"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -31,8 +32,9 @@ var log = logf.Log.WithName(controllerName)

// ReconcileAutoImport reconciles the managed cluster auto import secret to import the managed cluster
type ReconcileAutoImport struct {
client client.Client
recorder events.Recorder
client client.Client
kubeClient kubernetes.Interface
recorder events.Recorder
}

// blank assignment to verify that ReconcileAutoImport implements reconcile.Reconciler
Expand All @@ -58,11 +60,8 @@ func (r *ReconcileAutoImport) Reconcile(ctx context.Context, request reconcile.R
return reconcile.Result{}, err
}

autoImportSecret := &corev1.Secret{}
err = r.client.Get(ctx, types.NamespacedName{
Namespace: managedClusterName,
Name: constants.AutoImportSecretName,
}, autoImportSecret)
// TODO: we will use list instead of get to reduce the request in the future
autoImportSecret, err := r.kubeClient.CoreV1().Secrets(managedClusterName).Get(ctx, constants.AutoImportSecretName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// the auto import secret could have been deleted, do nothing
return reconcile.Result{}, nil
Expand All @@ -72,8 +71,7 @@ func (r *ReconcileAutoImport) Reconcile(ctx context.Context, request reconcile.R
}

importSecretName := fmt.Sprintf("%s-%s", managedClusterName, constants.ImportSecretNameSuffix)
importSecret := &corev1.Secret{}
err = r.client.Get(ctx, types.NamespacedName{Namespace: managedClusterName, Name: importSecretName}, importSecret)
importSecret, err := r.kubeClient.CoreV1().Secrets(managedClusterName).Get(ctx, importSecretName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// there is no import secret, do nothing
return reconcile.Result{}, nil
Expand Down Expand Up @@ -101,11 +99,11 @@ func (r *ReconcileAutoImport) Reconcile(ctx context.Context, request reconcile.R
importCondition.Message = fmt.Sprintf("Unable to import %s: %s", managedClusterName, err.Error())
importCondition.Reason = "ManagedClusterNotImported"

errs = append(errs, err, r.updateAutoImportRetryTimes(ctx, autoImportSecret))
errs = append(errs, err, r.updateAutoImportRetryTimes(ctx, autoImportSecret.DeepCopy()))
}

if len(errs) == 0 {
err := r.client.Delete(ctx, autoImportSecret)
err := r.kubeClient.CoreV1().Secrets(autoImportSecret.Namespace).Delete(ctx, autoImportSecret.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, err)
}
Expand Down Expand Up @@ -133,7 +131,8 @@ func (r *ReconcileAutoImport) updateAutoImportRetryTimes(ctx context.Context, se
autoImportRetry--
if autoImportRetry < 0 {
// stop retry, delete the auto-import-secret
if err := r.client.Delete(ctx, secret); err != nil {
err := r.kubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, metav1.DeleteOptions{})
if err != nil {
return err
}
r.recorder.Eventf("AutoImportSecretDeleted",
Expand All @@ -142,5 +141,6 @@ func (r *ReconcileAutoImport) updateAutoImportRetryTimes(ctx context.Context, se
}

secret.Data[autoImportRetryName] = []byte(strconv.Itoa(autoImportRetry))
return r.client.Update(ctx, secret)
_, err = r.kubeClient.CoreV1().Secrets(secret.Namespace).Update(ctx, secret, metav1.UpdateOptions{})
return err
}
13 changes: 11 additions & 2 deletions pkg/controller/autoimport/autoimport_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,6 +42,7 @@ func TestReconcile(t *testing.T) {
cases := []struct {
name string
objs []client.Object
secrets []runtime.Object
expectedErr bool
}{
{
Expand All @@ -56,6 +59,7 @@ func TestReconcile(t *testing.T) {
},
},
},
secrets: []runtime.Object{},
expectedErr: false,
},
{
Expand All @@ -66,6 +70,8 @@ func TestReconcile(t *testing.T) {
Name: "test",
},
},
},
secrets: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auto-import-secret",
Expand All @@ -83,6 +89,8 @@ func TestReconcile(t *testing.T) {
Name: "test",
},
},
},
secrets: []runtime.Object{
testinghelpers.GetImportSecret("test"),
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -103,8 +111,9 @@ func TestReconcile(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
r := &ReconcileAutoImport{
client: fake.NewClientBuilder().WithScheme(testscheme).WithObjects(c.objs...).Build(),
recorder: eventstesting.NewTestingEventRecorder(t),
client: fake.NewClientBuilder().WithScheme(testscheme).WithObjects(c.objs...).Build(),
kubeClient: kubefake.NewSimpleClientset(c.secrets...),
recorder: eventstesting.NewTestingEventRecorder(t),
}

req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "test"}}
Expand Down
57 changes: 31 additions & 26 deletions pkg/controller/autoimport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,40 @@
package autoimport

import (
"strings"

"github.com/open-cluster-management/managedcluster-import-controller/pkg/constants"
"github.com/open-cluster-management/managedcluster-import-controller/pkg/helpers"
"github.com/open-cluster-management/managedcluster-import-controller/pkg/source"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const controllerName = "autoimport-controller"

// Add creates a new autoimport controller and adds it to the Manager.
// The Manager will set fields on the Controller and Start it when the Manager is Started.
func Add(mgr manager.Manager, clientHolder *helpers.ClientHolder) (string, error) {
return controllerName, add(mgr, newReconciler(clientHolder))
func Add(mgr manager.Manager, clientHolder *helpers.ClientHolder,
importSecretInformer, autoImportSecretInformer cache.SharedIndexInformer) (string, error) {
return controllerName, add(importSecretInformer, autoImportSecretInformer, mgr, newReconciler(clientHolder))
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(clientHolder *helpers.ClientHolder) reconcile.Reconciler {
return &ReconcileAutoImport{
client: clientHolder.RuntimeClient,
recorder: helpers.NewEventRecorder(clientHolder.KubeClient, controllerName),
client: clientHolder.RuntimeClient,
kubeClient: clientHolder.KubeClient,
recorder: helpers.NewEventRecorder(clientHolder.KubeClient, controllerName),
}
}

// adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
func add(importSecretInformer, autoImportSecretInformer cache.SharedIndexInformer, mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New(controllerName, mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: helpers.GetMaxConcurrentReconciles(),
Expand All @@ -47,30 +46,36 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

// watch the import secrets
if err := c.Watch(
&source.Kind{Type: &corev1.Secret{}},
&handler.EnqueueRequestForObject{},
source.NewImportSecretSource(importSecretInformer),
&source.ManagedClusterSecretEventHandler{},
predicate.Predicate(predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
CreateFunc: func(e event.CreateEvent) bool {
secretName := e.Object.GetName()
// only handle the auto import secret and managed cluster import secret
if secretName == constants.AutoImportSecretName ||
strings.HasSuffix(secretName, constants.ImportSecretNameSuffix) {
return true
CreateFunc: func(e event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
new, okNew := e.ObjectNew.(*corev1.Secret)
old, okOld := e.ObjectOld.(*corev1.Secret)
if okNew && okOld {
return !equality.Semantic.DeepEqual(old.Data, new.Data)
}

return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
secretName := e.ObjectNew.GetName()
// only handle the auto import secret and managed cluster import secret
if secretName != constants.AutoImportSecretName &&
!strings.HasSuffix(secretName, constants.ImportSecretNameSuffix) {
return false
}
}),
); err != nil {
return err
}

// watch the auto-import secrets
if err := c.Watch(
source.NewAutoImportSecretSource(autoImportSecretInformer),
&source.ManagedClusterSecretEventHandler{},
predicate.Predicate(predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
CreateFunc: func(e event.CreateEvent) bool { return true },
UpdateFunc: func(e event.UpdateEvent) bool {
new, okNew := e.ObjectNew.(*corev1.Secret)
old, okOld := e.ObjectOld.(*corev1.Secret)
if okNew && okOld {
Expand Down

0 comments on commit 2fb6f7e

Please sign in to comment.