Skip to content

Commit

Permalink
Fix apache#1943: allow multiple operators to reconcile label filtered…
Browse files Browse the repository at this point in the history
… resources
  • Loading branch information
nicolaferraro committed Oct 7, 2021
1 parent 5938707 commit e5c4eea
Show file tree
Hide file tree
Showing 18 changed files with 351 additions and 35 deletions.
9 changes: 5 additions & 4 deletions e2e/builder/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ package builder

import (
"os"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"testing"

ctrl "sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"

Expand All @@ -51,7 +52,7 @@ func TestRunGlobalInstall(t *testing.T) {
}

test := func(operatorNamespace string) {
Expect(Kamel("install", "-n", operatorNamespace, "--global").Execute()).To(Succeed())
Expect(Kamel("install", "-n", operatorNamespace, "--global", "--force").Execute()).To(Succeed())

t.Run("Global test on namespace with platform", func(t *testing.T) {
WithNewTestNamespace(t, func(ns2 string) {
Expand Down Expand Up @@ -146,9 +147,9 @@ func TestRunGlobalInstall(t *testing.T) {
// global operators are always installed in the openshift-operators namespace
RegisterTestingT(t)
test("openshift-operators")
}else {
} else {
// create new namespace for the global operator
WithNewTestNamespace(t,test)
WithNewTestNamespace(t, test)
}
}

Expand Down
2 changes: 1 addition & 1 deletion e2e/common/cli/global_kamelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestRunGlobalKamelet(t *testing.T) {
}

WithNewTestNamespace(t, func(ns string) {
Expect(Kamel("install", "-n", ns, "--global").Execute()).To(Succeed())
Expect(Kamel("install", "-n", ns, "--global", "--force").Execute()).To(Succeed())

Expect(CreateTimerKamelet(ns, "my-own-timer-source")()).To(Succeed())

Expand Down
136 changes: 136 additions & 0 deletions e2e/common/operator_id_filtering_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//go:build integration
// +build integration

// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"

/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"fmt"
"os"
"testing"
"time"

. "github.com/apache/camel-k/e2e/support"
camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/openshift"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

func TestOperatorIDFiltering(t *testing.T) {
forceGlobalTest := os.Getenv("CAMEL_K_FORCE_GLOBAL_TEST") == "true"
if !forceGlobalTest {
ocp, err := openshift.IsOpenShift(TestClient())
assert.Nil(t, err)
if ocp {
t.Skip("Prefer not to run on OpenShift to avoid giving more permissions to the user running tests")
return
}
}

WithNewTestNamespace(t, func(ns string) {
WithNewTestNamespace(t, func(nsop1 string) {
WithNewTestNamespace(t, func(nsop2 string) {
Expect(Kamel("install", "-n", nsop1, "--operator-env-vars", "KAMEL_OPERATOR_ID=operator-1", "--global", "--force").Execute()).To(Succeed())
Expect(AssignPlatformToOperator(nsop1, "operator-1")).To(Succeed())
Eventually(PlatformPhase(nsop1), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady))

Expect(Kamel("install", "-n", nsop2, "--operator-env-vars", "KAMEL_OPERATOR_ID=operator-2", "--global", "--force").Execute()).To(Succeed())
Expect(AssignPlatformToOperator(nsop2, "operator-2")).To(Succeed())
Eventually(PlatformPhase(nsop2), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPlatformPhaseReady))

t.Run("Operators ignore non-scoped integrations", func(t *testing.T) {
RegisterTestingT(t)

Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "untouched").Execute()).To(Succeed())
Consistently(IntegrationPhase(ns, "untouched"), 10*time.Second).Should(BeEmpty())
})

t.Run("Operators run scoped integrations", func(t *testing.T) {
RegisterTestingT(t)

Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "moving").Execute()).To(Succeed())
Expect(AssignIntegrationToOperator(ns, "moving", "operator-1")).To(Succeed())
Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
})

t.Run("Operators can handoff scoped integrations", func(t *testing.T) {
RegisterTestingT(t)

Expect(AssignIntegrationToOperator(ns, "moving", "operator-2")).To(Succeed())
Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
Expect(Kamel("rebuild", "-n", ns, "moving").Execute()).To(Succeed())
Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
})

t.Run("Operators can be deactivated after completely handing off scoped integrations", func(t *testing.T) {
RegisterTestingT(t)

Expect(ScaleOperator(nsop1, 0)).To(Succeed())
Expect(Kamel("rebuild", "-n", ns, "moving").Execute()).To(Succeed())
Eventually(IntegrationPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
Eventually(IntegrationPodPhase(ns, "moving"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
Eventually(IntegrationLogs(ns, "moving"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
Expect(ScaleOperator(nsop1, 1)).To(Succeed())
})

t.Run("Operators can run scoped integrations with fixed image", func(t *testing.T) {
RegisterTestingT(t)

image := IntegrationPodImage(ns, "moving")()
Expect(image).NotTo(BeEmpty())
// Save resources by deleting "moving" integration
Expect(Kamel("delete", "moving", "-n", ns).Execute()).To(Succeed())

Expect(Kamel("run", "-n", ns, "files/yaml.yaml", "--name", "pre-built", "-t", fmt.Sprintf("container.image=%s", image)).Execute()).To(Succeed())
Consistently(IntegrationPhase(ns, "pre-built"), 10*time.Second).Should(BeEmpty())
Expect(AssignIntegrationToOperator(ns, "pre-built", "operator-2")).To(Succeed())
Eventually(IntegrationPhase(ns, "pre-built"), TestTimeoutShort).Should(Equal(camelv1.IntegrationPhaseRunning))
Eventually(IntegrationPodPhase(ns, "pre-built"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
Eventually(IntegrationLogs(ns, "pre-built"), TestTimeoutShort).Should(ContainSubstring("Magicstring!"))
Expect(Kamel("delete", "pre-built", "-n", ns).Execute()).To(Succeed())
})

t.Run("Operators can run scoped kamelet bindings", func(t *testing.T) {
RegisterTestingT(t)

Expect(Kamel("bind", "-n", ns, "timer-source?message=Hello", "log-sink", "--name", "klb").Execute()).To(Succeed())
Consistently(Integration(ns, "klb"), 10*time.Second).Should(BeNil())

Expect(AssignKameletBindingToOperator(ns, "klb", "operator-1")).To(Succeed())
Eventually(Integration(ns, "klb"), TestTimeoutShort).ShouldNot(BeNil())
Eventually(IntegrationPhase(ns, "klb"), TestTimeoutMedium).Should(Equal(camelv1.IntegrationPhaseRunning))
Eventually(IntegrationPodPhase(ns, "klb"), TestTimeoutMedium).Should(Equal(corev1.PodRunning))
})

})
})

// Clean up
RegisterTestingT(t)
Expect(Kamel("delete", "--all", "-n", ns).Execute()).To(Succeed())
})
}
5 changes: 5 additions & 0 deletions e2e/common/operator_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestMetrics(t *testing.T) {
Expect(build).NotTo(BeNil())

t.Run("Build duration metric", func(t *testing.T) {
RegisterTestingT(t)
// Get the duration from the Build status
duration, err := time.ParseDuration(build.Status.Duration)
Expect(err).To(BeNil())
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestMetrics(t *testing.T) {
})

t.Run("Build recovery attempts metric", func(t *testing.T) {
RegisterTestingT(t)
// Check there are no failures reported in the Build status
Expect(build.Status.Failure).To(BeNil())

Expand Down Expand Up @@ -167,6 +169,7 @@ func TestMetrics(t *testing.T) {
})

t.Run("reconciliation duration metric", func(t *testing.T) {
RegisterTestingT(t)
Expect(metrics).To(HaveKeyWithValue("camel_k_reconciliation_duration_seconds",
PointTo(MatchFields(IgnoreExtras, Fields{
"Name": EqualP("camel_k_reconciliation_duration_seconds"),
Expand Down Expand Up @@ -345,6 +348,7 @@ func TestMetrics(t *testing.T) {
})

t.Run("Build queue duration metric", func(t *testing.T) {
RegisterTestingT(t)
var ts1, ts2 time.Time
// The start queuing time is taken from the creation time
ts1 = build.CreationTimestamp.Time
Expand Down Expand Up @@ -401,6 +405,7 @@ func TestMetrics(t *testing.T) {
})

t.Run("Integration first readiness metric", func(t *testing.T) {
RegisterTestingT(t)
var ts1, ts2 time.Time

// The start time is taken from the Integration status initialization timestamp
Expand Down
67 changes: 51 additions & 16 deletions e2e/support/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/google/uuid"
"github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/spf13/cobra"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -422,6 +423,18 @@ func IntegrationCondition(ns string, name string, conditionType v1.IntegrationCo
}
}

func AssignIntegrationToOperator(ns, name, operator string) error {
it := Integration(ns, name)()
if it == nil {
return fmt.Errorf("cannot assign integration %q to operator: integration not found", name)
}
if it.Labels == nil {
it.Labels = make(map[string]string)
}
it.Labels[v1.OperatorIDLabel] = operator
return TestClient().Update(TestContext, it)
}

func Lease(ns string, name string) func() *coordination.Lease {
return func() *coordination.Lease {
lease := coordination.Lease{}
Expand Down Expand Up @@ -710,6 +723,18 @@ func ScaleKameletBinding(ns string, name string, replicas int32) error {
})
}

func AssignKameletBindingToOperator(ns, name, operator string) error {
klb := KameletBinding(ns, name)()
if klb == nil {
return fmt.Errorf("cannot assign kamelet binding %q to operator: kamelet binding not found", name)
}
if klb.Labels == nil {
klb.Labels = make(map[string]string)
}
klb.Labels[v1.OperatorIDLabel] = operator
return TestClient().Update(TestContext, klb)
}

type KitFilter interface {
Match(*v1.IntegrationKit) bool
}
Expand Down Expand Up @@ -1026,6 +1051,18 @@ func PlatformProfile(ns string) func() v1.TraitProfile {
}
}

func AssignPlatformToOperator(ns, operator string) error {
pl := Platform(ns)()
if pl == nil {
return errors.New("cannot assign platform to operator: no platform found")
}
if pl.Labels == nil {
pl.Labels = make(map[string]string)
}
pl.Labels[v1.OperatorIDLabel] = operator
return TestClient().Update(TestContext, pl)
}

func CRDs() func() []metav1.APIResource {
return func() []metav1.APIResource {

Expand Down Expand Up @@ -1098,24 +1135,22 @@ func OperatorTryPodForceKill(ns string, timeSeconds int) {
}
}

func ScaleOperator(ns string, replicas int32) func() error {
return func() error {
operator, err := TestClient().AppsV1().Deployments(ns).Get(TestContext, "camel-k-operator", metav1.GetOptions{})
if err != nil {
return err
}
operator.Spec.Replicas = &replicas
_, err = TestClient().AppsV1().Deployments(ns).Update(TestContext, operator, metav1.UpdateOptions{})
if err != nil {
return err
}
func ScaleOperator(ns string, replicas int32) error {
operator, err := TestClient().AppsV1().Deployments(ns).Get(TestContext, "camel-k-operator", metav1.GetOptions{})
if err != nil {
return err
}
operator.Spec.Replicas = &replicas
_, err = TestClient().AppsV1().Deployments(ns).Update(TestContext, operator, metav1.UpdateOptions{})
if err != nil {
return err
}

if replicas == 0 {
// speedup scale down by killing the pod
OperatorTryPodForceKill(ns, 10)
}
return nil
if replicas == 0 {
// speedup scale down by killing the pod
OperatorTryPodForceKill(ns, 10)
}
return nil
}

func ClusterRole() func() []rbacv1.ClusterRole {
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/camel/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const TraitAnnotationPrefix = "trait.camel.apache.org/"
const (
TraitAnnotationPrefix = "trait.camel.apache.org/"
OperatorIDLabel = "camel.apache.org/operator.id"
)

// ConfigurationSpec --
type ConfigurationSpec struct {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/build/build_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
Expand Down Expand Up @@ -71,7 +70,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
Named("build-controller").
// Watch for changes to primary resource Build
For(&v1.Build{}, builder.WithPredicates(
predicate.Funcs{
platform.FilteringFuncs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldBuild := e.ObjectOld.(*v1.Build)
newBuild := e.ObjectNew.(*v1.Build)
Expand Down Expand Up @@ -130,6 +129,12 @@ func (r *reconcileBuild) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, err
}

// Only process resources assigned to the operator
if !platform.IsOperatorHandler(&instance) {
rlog.Info("Ignoring request because resource is not assigned to current operator")
return reconcile.Result{}, nil
}

target := instance.DeepCopy()
targetLog := rlog.ForBuild(target)

Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/integration/integration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand Down Expand Up @@ -73,7 +72,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
Named("integration-controller").
// Watch for changes to primary resource Integration
For(&v1.Integration{}, builder.WithPredicates(
predicate.Funcs{
platform.FilteringFuncs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldIntegration := e.ObjectOld.(*v1.Integration)
newIntegration := e.ObjectNew.(*v1.Integration)
Expand Down Expand Up @@ -226,6 +225,12 @@ func (r *reconcileIntegration) Reconcile(ctx context.Context, request reconcile.
return reconcile.Result{}, err
}

// Only process resources assigned to the operator
if !platform.IsOperatorHandler(&instance) {
rlog.Info("Ignoring request because resource is not assigned to current operator")
return reconcile.Result{}, nil
}

target := instance.DeepCopy()
targetLog := rlog.ForIntegration(target)

Expand Down

0 comments on commit e5c4eea

Please sign in to comment.