From b7f21d00de1b9bf0067f12bf9d54b047b62dd122 Mon Sep 17 00:00:00 2001 From: Michael Burt Date: Mon, 23 May 2022 14:17:34 -0600 Subject: [PATCH] Add allow-only-ready-replicas flag to only include Ready hashring replicas in the generated hashring ConfigMap. --- go.mod | 1 + go.sum | 1 + main.go | 31 +++++++++++++++++++++++++++++-- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 4ab2798..25a0aeb 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( k8s.io/api v0.23.5 k8s.io/apimachinery v0.23.5 k8s.io/client-go v12.0.0+incompatible + k8s.io/kubernetes v1.13.0 ) require ( diff --git a/go.sum b/go.sum index f8821bb..4301761 100644 --- a/go.sum +++ b/go.sum @@ -2760,6 +2760,7 @@ k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iL k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4= k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk= +k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8= k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20191114200735-6ca3b61696b6/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= diff --git a/main.go b/main.go index e331e9b..c086979 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" ) type label = string @@ -69,6 +70,7 @@ func main() { Port int Scheme string InternalAddr string + AllowOnlyReadyReplicas bool ScaleTimeout time.Duration }{} @@ -82,6 +84,7 @@ func main() { flag.IntVar(&config.Port, "port", defaultPort, "The port on which receive components are listening for write requests") flag.StringVar(&config.Scheme, "scheme", "http", "The URL scheme on which receive components accept write requests") flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs") + flag.BoolVar(&config.AllowOnlyReadyReplicas, "allow-only-ready-replicas", false, "Populate only Ready receiver replicas in the hashring configuration") flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy") flag.Parse() @@ -124,6 +127,7 @@ func main() { scheme: config.Scheme, labelKey: labelKey, labelValue: labelValue, + allowOnlyReadyReplicas: config.AllowOnlyReadyReplicas, scaleTimeout: config.ScaleTimeout, } c := newController(klient, logger, opt) @@ -305,6 +309,7 @@ type options struct { scheme string labelKey string labelValue string + allowOnlyReadyReplicas bool scaleTimeout time.Duration } @@ -546,7 +551,7 @@ func (c *controller) sync(ctx context.Context) { time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s } - c.populate(hashrings, statefulsets) + c.populate(ctx, hashrings, statefulsets) if err := c.saveHashring(ctx, hashrings, cm); err != nil { c.reconcileErrors.WithLabelValues(save).Inc() @@ -565,6 +570,12 @@ func (c controller) waitForPod(ctx context.Context, name string) error { } switch pod.Status.Phase { case corev1.PodRunning: + if c.options.allowOnlyReadyReplicas { + if podutil.IsPodReady(pod) { + return true, nil + } + return false, nil + } return true, nil case corev1.PodFailed, corev1.PodPending, corev1.PodSucceeded, corev1.PodUnknown: return false, nil @@ -574,12 +585,28 @@ func (c controller) waitForPod(ctx context.Context, name string) error { }) } -func (c *controller) populate(hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) { +//nolint:nestif +func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) { for i, h := range hashrings { if sts, exists := statefulsets[h.Hashring]; exists { var endpoints []string for i := 0; i < int(*sts.Spec.Replicas); i++ { + // Do not add a replica to the hashring if pod is not Ready. + if c.options.allowOnlyReadyReplicas { + podName := fmt.Sprintf("%s-%d", sts.Name, i) + pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{}) + + if kerrors.IsNotFound(err) { + continue + } + + if !podutil.IsPodReady(pod) { + level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err) + continue + } + } + // If cluster domain is empty string we don't want dot after svc. clusterDomain := "" if c.options.clusterDomain != "" {