forked from redpanda-data/redpanda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster_controller_configuration_drift.go
212 lines (180 loc) · 8.44 KB
/
cluster_controller_configuration_drift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
package redpanda
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/configuration"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
const (
defaultDriftCheckPeriod = 1 * time.Minute
debugLogLevel = 4
)
// ClusterConfigurationDriftReconciler detects drifts in the cluster configuration and triggers a reconciliation.
type ClusterConfigurationDriftReconciler struct {
client.Client
Log logr.Logger
clusterDomain string
Scheme *runtime.Scheme
DriftCheckPeriod *time.Duration
AdminAPIClientFactory adminutils.AdminAPIClientFactory
}
// Reconcile detects drift in configuration for clusters and schedules a patch.
//
//nolint:funlen // May be broken down
func (r *ClusterConfigurationDriftReconciler) Reconcile(
ctx context.Context, req ctrl.Request,
) (ctrl.Result, error) {
log := r.Log.WithValues("redpandacluster", req.NamespacedName)
log.V(debugLogLevel).Info(fmt.Sprintf("Starting configuration drift reconcile loop for %v", req.NamespacedName))
defer log.V(debugLogLevel).Info(fmt.Sprintf("Finished configuration drift reconcile loop for %v", req.NamespacedName))
var redpandaCluster redpandav1alpha1.Cluster
if err := r.Get(ctx, req.NamespacedName, &redpandaCluster); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("unable to retrieve Cluster resource: %w", err)
}
if !featuregates.CentralizedConfiguration(redpandaCluster.Spec.Version) {
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
if !isRedpandaClusterManaged(log, &redpandaCluster) {
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
condition := redpandaCluster.Status.GetCondition(redpandav1alpha1.ClusterConfiguredConditionType)
if condition == nil || condition.Status != corev1.ConditionTrue {
// configuration drift already signaled
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
// wait at least a driftCheckPeriod before checking drifts
now := time.Now()
driftCheckPeriod := r.getDriftCheckPeriod()
if condition.LastTransitionTime.Time.Add(driftCheckPeriod).After(now) {
period := condition.LastTransitionTime.Time.Add(driftCheckPeriod).Sub(now)
return ctrl.Result{RequeueAfter: period}, nil
}
// Pre-check before contacting the admin API to exclude errors
if available, err := adminutils.IsAvailableInPreFlight(ctx, r, &redpandaCluster); err != nil {
return ctrl.Result{}, fmt.Errorf("could not perform pre-flight check for admin API availability: %w", err)
} else if !available {
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
redpandaPorts := networking.NewRedpandaPorts(&redpandaCluster)
headlessPorts := collectHeadlessPorts(redpandaPorts)
clusterPorts := collectClusterPorts(redpandaPorts, &redpandaCluster)
headlessSvc := resources.NewHeadlessService(r.Client, &redpandaCluster, r.Scheme, headlessPorts, log)
clusterSvc := resources.NewClusterService(r.Client, &redpandaCluster, r.Scheme, clusterPorts, log)
var proxySu *resources.SuperUsersResource
var proxySuKey types.NamespacedName
if redpandaCluster.Spec.EnableSASL && redpandaCluster.PandaproxyAPIInternal() != nil {
proxySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramPandaproxyUsername, resources.PandaProxySuffix, log)
proxySuKey = proxySu.Key()
}
var schemaRegistrySu *resources.SuperUsersResource
var schemaRegistrySuKey types.NamespacedName
if redpandaCluster.Spec.EnableSASL && redpandaCluster.Spec.Configuration.SchemaRegistry != nil {
schemaRegistrySu = resources.NewSuperUsers(r.Client, &redpandaCluster, r.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, log)
schemaRegistrySuKey = schemaRegistrySu.Key()
}
pki := certmanager.NewPki(r.Client, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), clusterSvc.ServiceFQDN(r.clusterDomain), r.Scheme, log)
configMapResource := resources.NewConfigMap(r.Client, &redpandaCluster, r.Scheme, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), proxySuKey, schemaRegistrySuKey, log)
lastAppliedConfig, cmExists, err := configMapResource.GetLastAppliedConfigurationFromCluster(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("could not get last applied configuration to check drifts: %w", err)
} else if !cmExists {
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
adminAPI, err := r.AdminAPIClientFactory(ctx, r, &redpandaCluster, headlessSvc.HeadlessServiceFQDN(r.clusterDomain), pki.AdminAPIConfigProvider())
if err != nil {
return ctrl.Result{}, fmt.Errorf("could not get admin API to check drifts on the cluster: %w", err)
}
schema, err := adminAPI.ClusterConfigSchema(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("could not get cluster schema to check drifts: %w", err)
}
clusterConfig, err := adminAPI.Config(ctx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("could not get cluster configuration to check drifts: %w", err)
}
// Since config is in sync, we assume that the current desired configuration is equal to the lastAppliedConfig and there are no invalid properties
patch := configuration.ThreeWayMerge(log, lastAppliedConfig, clusterConfig, lastAppliedConfig, nil, schema)
if patch.Empty() {
// Nothing to do, everything in sync
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
log.Info("Detected configuration drift in the cluster")
// Signal drift by setting the condition to False
redpandaCluster.Status.SetCondition(
redpandav1alpha1.ClusterConfiguredConditionType,
corev1.ConditionFalse,
redpandav1alpha1.ClusterConfiguredReasonDrift,
"Drift detected by periodic check",
)
if err := r.Status().Update(ctx, &redpandaCluster); err != nil {
return ctrl.Result{}, fmt.Errorf("could not patch cluster to signal a configuration drift: %w", err)
}
return ctrl.Result{RequeueAfter: r.getDriftCheckPeriod()}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ClusterConfigurationDriftReconciler) SetupWithManager(
mgr ctrl.Manager,
) error {
return ctrl.NewControllerManagedBy(mgr).
For(&redpandav1alpha1.Cluster{}).
WithEventFilter(createOrDeleteEventFilter{}).
Complete(r)
}
// WithClusterDomain set the clusterDomain
func (r *ClusterConfigurationDriftReconciler) WithClusterDomain(
clusterDomain string,
) *ClusterConfigurationDriftReconciler {
r.clusterDomain = clusterDomain
return r
}
func (r *ClusterConfigurationDriftReconciler) getDriftCheckPeriod() time.Duration {
if r.DriftCheckPeriod != nil {
return *r.DriftCheckPeriod
}
return defaultDriftCheckPeriod
}
// createOrDeleteEventFilter selects only the events of creation and deletion of a cluster,
// to make the controller independent of changes to the resources.
// Note: a "create" event is also fired for existing resources when the controller starts up.
type createOrDeleteEventFilter struct{}
// Create is implemented for compatibility with predicate.Predicate
func (filter createOrDeleteEventFilter) Create(event.CreateEvent) bool {
return true
}
// Delete is implemented for compatibility with predicate.Predicate
func (filter createOrDeleteEventFilter) Delete(event.DeleteEvent) bool {
return true
}
// Update is implemented for compatibility with predicate.Predicate
func (filter createOrDeleteEventFilter) Update(event.UpdateEvent) bool {
return false
}
// Generic is implemented for compatibility with predicate.Predicate
func (filter createOrDeleteEventFilter) Generic(event.GenericEvent) bool {
return false
}