Skip to content

Commit

Permalink
[CN-1029] Add support for scale subresource
Browse files Browse the repository at this point in the history
  • Loading branch information
dzeromski-hazelcast committed Nov 26, 2023
1 parent 94cf4a0 commit a15310c
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 21 deletions.
9 changes: 9 additions & 0 deletions api/v1alpha1/hazelcast_types.go
Expand Up @@ -1073,6 +1073,14 @@ type SQL struct {

// HazelcastStatus defines the observed state of Hazelcast
type HazelcastStatus struct {
// Number of Hazelcast members in the cluster.
// +optional
ClusterSize int32 `json:"clusterSize"`

// Selector is a label selector used by HorizontalPodAutoscaler to autoscale Hazelcast resource.
// +optional
Selector string `json:"selector"`

// Phase of the Hazelcast cluster
// +optional
Phase Phase `json:"phase,omitempty"`
Expand Down Expand Up @@ -1194,6 +1202,7 @@ type HazelcastClusterStatus struct {

// Hazelcast is the Schema for the hazelcasts API
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.clusterSize,statuspath=.status.clusterSize,selectorpath=.status.selector
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Current state of the Hazelcast deployment"
// +kubebuilder:printcolumn:name="Members",type="string",JSONPath=".status.hazelcastClusterStatus.readyMembers",description="Current numbers of ready Hazelcast members"
// +kubebuilder:printcolumn:name="Message",type="string",priority=1,JSONPath=".status.message",description="Message for the current Hazelcast Config"
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/hazelcast.com_hazelcasts.yaml
Expand Up @@ -1911,6 +1911,10 @@ spec:
status:
description: HazelcastStatus defines the observed state of Hazelcast
properties:
clusterSize:
description: Number of Hazelcast members in the cluster.
format: int32
type: integer
hazelcastClusterStatus:
description: Status of the Hazelcast cluster
properties:
Expand Down Expand Up @@ -2014,9 +2018,17 @@ spec:
- Succeeded
type: string
type: object
selector:
description: Selector is a label selector used by HorizontalPodAutoscaler
to autoscale Hazelcast resource.
type: string
type: object
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.clusterSize
statusReplicasPath: .status.clusterSize
status: {}
54 changes: 45 additions & 9 deletions controllers/hazelcast/hazelcast_controller.go
Expand Up @@ -216,11 +216,35 @@ func (r *HazelcastReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.update(ctx, h, recoptions.RetryAfter(retryAfter), withHzPhase(hazelcastv1alpha1.Pending))
}

if ok, err := util.CheckIfRunning(ctx, r.Client, req.NamespacedName, *h.Spec.ClusterSize); !ok {
var statefulSet appsv1.StatefulSet
if err := r.Client.Get(ctx, req.NamespacedName, &statefulSet); err != nil {
if errors.IsNotFound(err) {
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
r.withMemberStatuses(ctx, h, err),
withHzStatefulSet(statefulSet),
)
}
return r.update(ctx, h, recoptions.Error(err),
withHzFailedPhase(err.Error()),
r.withMemberStatuses(ctx, h, err),
withHzStatefulSet(statefulSet),
)
}

if ok, err := util.CheckIfRunning(ctx, r.Client, &statefulSet, *h.Spec.ClusterSize); !ok {
if err == nil {
return r.update(ctx, h, recoptions.RetryAfter(retryAfter), withHzPhase(hazelcastv1alpha1.Pending), r.withMemberStatuses(ctx, h, err))
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
r.withMemberStatuses(ctx, h, err),
withHzStatefulSet(statefulSet),
)
} else {
return r.update(ctx, h, recoptions.Error(err), withHzFailedPhase(err.Error()), r.withMemberStatuses(ctx, h, err))
return r.update(ctx, h, recoptions.Error(err),
withHzFailedPhase(err.Error()),
r.withMemberStatuses(ctx, h, err),
withHzStatefulSet(statefulSet),
)
}
}

Expand All @@ -229,15 +253,19 @@ func (r *HazelcastReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
withHzMessage(err.Error()),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}
r.statusServiceRegistry.Create(req.NamespacedName, cl, r.Log, r.triggerReconcileChan)

if err = r.ensureClusterActive(ctx, cl, h); err != nil {
logger.Error(err, "Cluster activation attempt after hot restore failed")
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}

if !cl.IsClientConnected() {
Expand All @@ -247,20 +275,26 @@ func (r *HazelcastReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
withHzMessage(err.Error()),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
withHzMessage("Client is not connected to the cluster!"),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}

if newExecutorServices != nil {
if !cl.AreAllMembersAccessible() {
return r.update(ctx, h, recoptions.RetryAfter(retryAfter),
withHzPhase(hazelcastv1alpha1.Pending),
withHzMessage("Not all Hazelcast members are accessible!"),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}
r.addExecutorServices(ctx, cl, newExecutorServices)
}
Expand All @@ -276,7 +310,9 @@ func (r *HazelcastReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.update(ctx, h, recoptions.Empty(),
withHzPhase(hazelcastv1alpha1.Running),
withHzMessage(clientConnectionMessage(r.clientRegistry, req)),
r.withMemberStatuses(ctx, h, nil))
r.withMemberStatuses(ctx, h, nil),
withHzStatefulSet(statefulSet),
)
}

func (r *HazelcastReconciler) podUpdates(pod client.Object) []reconcile.Request {
Expand Down
17 changes: 17 additions & 0 deletions controllers/hazelcast/hazelcast_status.go
Expand Up @@ -6,8 +6,10 @@ import (
"strings"

hztypes "github.com/hazelcast/hazelcast-go-client/types"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -45,6 +47,21 @@ func (m withHzMessage) HzStatusApply(hs *hazelcastv1alpha1.HazelcastStatus) {
hs.Message = string(m)
}

type withHzStatefulSet appsv1.StatefulSet

// HzStatusApply propagates selector and cluster size from underlying StatefulSet
func (s withHzStatefulSet) HzStatusApply(status *hazelcastv1alpha1.HazelcastStatus) {
// Retrieve the current number of replicas from the StatefulSet
status.ClusterSize = s.Status.Replicas

// Retrieve the label selectors from the StatefulSet
selector, err := metav1.LabelSelectorAsSelector(s.Spec.Selector)
if err != nil {
return
}
status.Selector = selector.String()
}

type memberStatuses struct {
readyMembers string
readyMembersMap map[hztypes.UUID]*hzclient.MemberData
Expand Down
15 changes: 14 additions & 1 deletion controllers/managementcenter/managementcenter_controller.go
Expand Up @@ -123,7 +123,20 @@ func (r *ManagementCenterReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}

if ok, err := util.CheckIfRunning(ctx, r.Client, req.NamespacedName, 1); !ok {
var statefulSet appsv1.StatefulSet
if err := r.Client.Get(ctx, req.NamespacedName, &statefulSet); err != nil {
if errors.IsNotFound(err) {
if mc.Status.Phase == hazelcastv1alpha1.McConfiguring {
return update(ctx, r.Client, mc, recoptions.RetryAfter(retryAfter),
withMcPhase(hazelcastv1alpha1.McConfiguring),
)
}
return update(ctx, r.Client, mc, recoptions.RetryAfter(retryAfter), withMcPhase(hazelcastv1alpha1.McPending))
}
return update(ctx, r.Client, mc, recoptions.Error(err), withMcFailedPhase(err.Error()))
}

if ok, err := util.CheckIfRunning(ctx, r.Client, &statefulSet, 1); !ok {
if mc.Status.Phase == hazelcastv1alpha1.McConfiguring {
return update(ctx, r.Client, mc, recoptions.RetryAfter(retryAfter), withMcPhase(hazelcastv1alpha1.McConfiguring))
}
Expand Down
@@ -1,3 +1,4 @@

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down Expand Up @@ -2289,6 +2290,10 @@ spec:
status:
description: HazelcastStatus defines the observed state of Hazelcast
properties:
clusterSize:
description: Number of Hazelcast members in the cluster.
format: int32
type: integer
hazelcastClusterStatus:
description: Status of the Hazelcast cluster
properties:
Expand Down Expand Up @@ -2392,11 +2397,19 @@ spec:
- Succeeded
type: string
type: object
selector:
description: Selector is a label selector used by HorizontalPodAutoscaler
to autoscale Hazelcast resource.
type: string
type: object
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.clusterSize
statusReplicasPath: .status.clusterSize
status: {}
---
apiVersion: apiextensions.k8s.io/v1
Expand Down
14 changes: 3 additions & 11 deletions internal/util/util.go
Expand Up @@ -89,19 +89,11 @@ func CreateOrGet(ctx context.Context, c client.Client, key client.ObjectKey, obj
}
}

func CheckIfRunning(ctx context.Context, cl client.Client, namespacedName types.NamespacedName, expectedReplicas int32) (bool, error) {
sts := &appsv1.StatefulSet{}
err := cl.Get(ctx, client.ObjectKey{Name: namespacedName.Name, Namespace: namespacedName.Namespace}, sts)
if err != nil {
if kerrors.IsNotFound(err) {
return false, nil
}
return false, err
}
if isStatefulSetReady(sts, expectedReplicas) {
func CheckIfRunning(ctx context.Context, cl client.Client, statefulSet *appsv1.StatefulSet, expectedReplicas int32) (bool, error) {
if isStatefulSetReady(statefulSet, expectedReplicas) {
return true, nil
}
if err := checkPodsForFailure(ctx, cl, sts); err != nil {
if err := checkPodsForFailure(ctx, cl, statefulSet); err != nil {
return false, err
}
return false, nil
Expand Down

0 comments on commit a15310c

Please sign in to comment.