Skip to content

Commit

Permalink
migrate to controller-runtime controllers for k8s watch events
Browse files Browse the repository at this point in the history
This replaces the internal informers and moves to the controller-runtime controllers
setting up Contour to migrate over to the controller-runtime caches.

Updates #2683

Signed-off-by: Steve Sloka <slokas@vmware.com>
  • Loading branch information
stevesloka committed Mar 23, 2021
1 parent acf533c commit 5c18ee2
Show file tree
Hide file tree
Showing 18 changed files with 1,166 additions and 118 deletions.
158 changes: 111 additions & 47 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/projectcontour/contour/internal/health"
"github.com/projectcontour/contour/internal/httpsvc"
"github.com/projectcontour/contour/internal/k8s"
contour_cache "github.com/projectcontour/contour/internal/k8s/cache"
"github.com/projectcontour/contour/internal/metrics"
"github.com/projectcontour/contour/internal/timeout"
"github.com/projectcontour/contour/internal/workgroup"
Expand All @@ -54,6 +55,10 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
controller_config "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
gatewayapi_v1alpha1 "sigs.k8s.io/gateway-api/apis/v1alpha1"
)

// Add RBAC policy to support leader election.
Expand Down Expand Up @@ -419,6 +424,12 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// the output of the other processors.
dagProcessors = append(dagProcessors, &dag.ListenerProcessor{})

// Setup a Manager
mgr, err := manager.New(controller_config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Fatal(err, "unable to set up overall controller manager")
}

// Build the core Kubernetes event handler.
eventHandler := &contour.EventHandler{
HoldoffDelay: 100 * time.Millisecond,
Expand All @@ -445,36 +456,64 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

// Wrap eventHandler in a converter for objects from the dynamic client.
// and an EventRecorder which tracks API server events.
dynamicHandler := k8s.DynamicClientHandler{
dynamicHandler := &k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: eventHandler,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "dynamicHandler"),
Logger: log.WithField("context", "dynamicHandler"),
}

// Inform on DefaultResources.
for _, r := range k8s.DefaultResources() {
inf, err := clients.InformerForResource(r)
if err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
err = contour_api_v1.AddToScheme(mgr.GetScheme())
if err != nil {
log.Error(err, "unable to add Contour V1 API to scheme.")
os.Exit(1)
}

err = contour_api_v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
log.Error(err, "unable to add Contour Alpha1 API to scheme.")
os.Exit(1)
}

// Inform on DefaultResources by setting up each controller and registering the watch event
// with controller-runtime.

inf.AddEventHandler(&dynamicHandler)
// Create and register the service controller with the manager.
if _, err := contour_cache.NewServiceController(mgr, dynamicHandler, log.WithField("context", "service-controller")); err != nil {
log.WithError(err).Fatal("failed to create service-controller")
}

// Create and register the HTTPProxy controller with the manager.
if _, err := contour_cache.NewHTTPProxyController(mgr, dynamicHandler, log.WithField("context", "httpproxy-controller")); err != nil {
log.WithError(err).Fatal("failed to create httpproxy-controller")
}

// Create and register the TLSCertificateDelegation controller with the manager.
if _, err := contour_cache.NewTLSCertificateDelegationController(mgr, dynamicHandler, log.WithField("context", "tlscertificatedelegation-controller")); err != nil {
log.WithError(err).Fatal("failed to create tlscertificatedelegation-controller")
}

// Create and register the ExtensionService controller with the manager.
if _, err := contour_cache.NewExtensionServiceController(mgr, dynamicHandler, log.WithField("context", "extensionservice-controller")); err != nil {
log.WithError(err).Fatal("failed to create extensionservice-controller")
}

// If Ingress v1 resource exist, then add informers to watch, otherwise
// add Ingress v1beta1 informers.
if clients.ResourcesExist(k8s.IngressV1Resources()...) {
for _, r := range k8s.IngressV1Resources() {
if err := informOnResource(clients, r, &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
// Create and register the NewIngressV1Controller controller with the manager.
if _, err := contour_cache.NewIngressV1Controller(mgr, dynamicHandler, log.WithField("context", "ingressv1-controller")); err != nil {
log.WithError(err).Fatal("failed to create ingressv1-controller")
}
// Create and register the NewIngressV1Controller controller with the manager.
if _, err := contour_cache.NewIngressClassController(mgr, dynamicHandler, log.WithField("context", "ingressclass-controller")); err != nil {
log.WithError(err).Fatal("failed to create ingressclass-controller")
}
} else {
if err := informOnResource(clients, k8s.IngressV1Beta1Resource(), &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", k8s.IngressV1Beta1Resource()).Fatal("failed to create informer")
// Create and register the NewIngressV1Beta1Controller controller with the manager.
if _, err := contour_cache.NewIngressV1Beta1Controller(mgr, dynamicHandler, log.WithField("context", "ingressv1beta1-controller")); err != nil {
log.WithError(err).Fatal("failed to create ingressv1beta1-controller")
}
}

Expand All @@ -486,46 +525,66 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// Only inform on GatewayAPI resources if Gateway API is found.
if ctx.Config.GatewayConfig != nil {
if clients.ResourcesExist(k8s.GatewayAPIResources()...) {
for _, r := range k8s.GatewayAPIResources() {
if err := informOnResource(clients, r, &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}

err = gatewayapi_v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
log.Error(err, "unable to add GatewayAPI to scheme.")
os.Exit(1)
}
// Inform on Namespaces.
if err := informOnResource(clients, k8s.NamespacesResource(), &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", k8s.NamespacesResource()).Fatal("failed to create informer")

// Create and register the NewGatewayController controller with the manager.
if _, err := contour_cache.NewGatewayController(mgr, dynamicHandler, log.WithField("context", "gateway-controller")); err != nil {
log.WithError(err).Fatal("failed to create gateway-controller")
}

// Create and register the NewHTTPRouteController controller with the manager.
if _, err := contour_cache.NewHTTPRouteController(mgr, dynamicHandler, log.WithField("context", "httproute-controller")); err != nil {
log.WithError(err).Fatal("failed to create httproute-controller")
}

// Create and register the NewTLSRouteController controller with the manager.
if _, err := contour_cache.NewTLSRouteController(mgr, dynamicHandler, log.WithField("context", "tlsroute-controller")); err != nil {
log.WithError(err).Fatal("failed to create tlsroute-controller")
}

// Create and register the NewBackendPolicyController controller with the manager.
if _, err := contour_cache.NewBackendPolicyController(mgr, dynamicHandler, log.WithField("context", "backendpolicy-controller")); err != nil {
log.WithError(err).Fatal("failed to create backendpolicy-controller")
}

// Create and register the NewNamespaceController controller with the manager.
if _, err := contour_cache.NewNamespaceController(mgr, dynamicHandler, log.WithField("context", "namespace-controller")); err != nil {
log.WithError(err).Fatal("failed to create namespace-controller")
}
} else {
log.Fatalf("GatewayAPI Gateway configured but APIs not installed in cluster.")
}
}

// Inform on secrets, filtering by root namespaces.
for _, r := range k8s.SecretsResources() {
var handler cache.ResourceEventHandler = &dynamicHandler
var handler cache.ResourceEventHandler = dynamicHandler

// If root namespaces are defined, filter for secrets in only those namespaces.
if len(informerNamespaces) > 0 {
handler = k8s.NewNamespaceFilter(informerNamespaces, &dynamicHandler)
}
// If root namespaces are defined, filter for secrets in only those namespaces.
if len(informerNamespaces) > 0 {
handler = k8s.NewNamespaceFilter(informerNamespaces, dynamicHandler)
}

if err := informOnResource(clients, r, handler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
// Create and register the NewSecretController controller with the manager.
if _, err := contour_cache.NewSecretController(mgr, handler, log.WithField("context", "secret-controller")); err != nil {
log.WithError(err).Fatal("failed to create secret-controller")
}

// Inform on endpoints.
for _, r := range k8s.EndpointsResources() {
if err := informOnResource(clients, r, &k8s.DynamicClientHandler{
// Create and register the NewEndpointController controller with the manager.
if _, err := contour_cache.NewEndpointController(mgr,
&k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
},
log.WithField("context", "endpoints-controller")); err != nil {
log.WithError(err).Fatal("failed to create endpoints-controller")
}

// Set up workgroup runner and register informers.
Expand Down Expand Up @@ -637,6 +696,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
log.WithField("loadbalancer-address", lbAddr).Info("Using supplied information for Ingress status")
lbsw.lbStatus <- parseStatusFlag(lbAddr)
} else {

dynamicServiceHandler := k8s.DynamicClientHandler{
Next: &k8s.ServiceStatusLoadBalancerWatcher{
ServiceName: ctx.Config.EnvoyServiceName,
Expand All @@ -647,23 +707,27 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
Logger: log.WithField("context", "serviceStatusLoadBalancerWatcher"),
}

for _, r := range k8s.ServicesResources() {
var handler cache.ResourceEventHandler = &dynamicServiceHandler
var handler cache.ResourceEventHandler = &dynamicServiceHandler

if ctx.Config.EnvoyServiceNamespace != "" {
handler = k8s.NewNamespaceFilter([]string{ctx.Config.EnvoyServiceNamespace}, handler)
}
if ctx.Config.EnvoyServiceNamespace != "" {
handler = k8s.NewNamespaceFilter([]string{ctx.Config.EnvoyServiceNamespace}, handler)
}

if err := informOnResource(clients, r, handler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
// Create and register the service controller with the manager.
if _, err := contour_cache.NewServiceController(mgr, handler, log.WithField("context", "service-controller")); err != nil {
log.WithError(err).Fatal("failed to create service-controller")
}

log.WithField("envoy-service-name", ctx.Config.EnvoyServiceName).
WithField("envoy-service-namespace", ctx.Config.EnvoyServiceNamespace).
Info("Watching Service for Ingress status")
}

// Start Manager
g.AddContext(func(taskCtx context.Context) error {
return mgr.Start(signals.SetupSignalHandler())
})

g.AddContext(func(taskCtx context.Context) error {
log := log.WithField("context", "xds")

Expand Down
1 change: 1 addition & 0 deletions internal/dag/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
// A KubernetesCache holds Kubernetes objects and associated configuration and produces
// DAG values.
type KubernetesCache struct {

// RootNamespaces specifies the namespaces where root
// HTTPProxies can be defined. If empty, roots can be defined in any
// namespace.
Expand Down
75 changes: 75 additions & 0 deletions internal/k8s/cache/backendpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"context"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayapi_v1alpha1 "sigs.k8s.io/gateway-api/apis/v1alpha1"
)

type backendPolicyReconciler struct {
client client.Client
eventHandler cache.ResourceEventHandler
logrus.FieldLogger
}

// NewBackendPolicyController creates the backendpolicy controller from mgr. The controller will be pre-configured
// to watch for BackendPolicy objects across all namespaces.
func NewBackendPolicyController(mgr manager.Manager, eventHandler cache.ResourceEventHandler, log logrus.FieldLogger) (controller.Controller, error) {
r := &backendPolicyReconciler{
client: mgr.GetClient(),
eventHandler: eventHandler,
FieldLogger: log,
}
c, err := controller.New("backendpolicy-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return nil, err
}
if err := c.Watch(&source.Kind{Type: &gatewayapi_v1alpha1.BackendPolicy{}}, &handler.EnqueueRequestForObject{}); err != nil {
return nil, err
}
return c, nil
}

func (r *backendPolicyReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {

// Fetch the BackendPolicy from the cache.
gateway := &gatewayapi_v1alpha1.BackendPolicy{}
err := r.client.Get(ctx, request.NamespacedName, gateway)
if errors.IsNotFound(err) {
r.Error(nil, "Could not find BackendPolicy %q in Namespace %q", request.Name, request.Namespace)
return reconcile.Result{}, nil
}

// Check if object is deleted.
if !gateway.ObjectMeta.DeletionTimestamp.IsZero() {
r.eventHandler.OnDelete(gateway)
return reconcile.Result{}, nil
}

// Pass the new changed object off to the eventHandler.
r.eventHandler.OnAdd(gateway)

return reconcile.Result{}, nil
}

0 comments on commit 5c18ee2

Please sign in to comment.