Skip to content

Commit

Permalink
Add allow-only-ready-replicas flag to only include Ready hashring rep…
Browse files Browse the repository at this point in the history
…licas in the generated hashring ConfigMap.
  • Loading branch information
Michael Burt committed May 23, 2022
1 parent b58820f commit b7f21d0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -11,6 +11,7 @@ require (
k8s.io/api v0.23.5
k8s.io/apimachinery v0.23.5
k8s.io/client-go v12.0.0+incompatible
k8s.io/kubernetes v1.13.0
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -2760,6 +2760,7 @@ k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iL
k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c/go.mod h1:vHXdDvt9+2spS2Rx9ql3I8tycm3H9FDfdUoIuKCefvw=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk=
k8s.io/kubernetes v1.13.0 h1:qTfB+u5M92k2fCCCVP2iuhgwwSOv1EkAkvQY1tQODD8=
k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20191114200735-6ca3b61696b6/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
Expand Down
31 changes: 29 additions & 2 deletions main.go
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

type label = string
Expand Down Expand Up @@ -69,6 +70,7 @@ func main() {
Port int
Scheme string
InternalAddr string
AllowOnlyReadyReplicas bool
ScaleTimeout time.Duration
}{}

Expand All @@ -82,6 +84,7 @@ func main() {
flag.IntVar(&config.Port, "port", defaultPort, "The port on which receive components are listening for write requests")
flag.StringVar(&config.Scheme, "scheme", "http", "The URL scheme on which receive components accept write requests")
flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs")
flag.BoolVar(&config.AllowOnlyReadyReplicas, "allow-only-ready-replicas", false, "Populate only Ready receiver replicas in the hashring configuration")
flag.DurationVar(&config.ScaleTimeout, "scale-timeout", defaultScaleTimeout, "A timeout to wait for receivers to really start after they report healthy")
flag.Parse()

Expand Down Expand Up @@ -124,6 +127,7 @@ func main() {
scheme: config.Scheme,
labelKey: labelKey,
labelValue: labelValue,
allowOnlyReadyReplicas: config.AllowOnlyReadyReplicas,
scaleTimeout: config.ScaleTimeout,
}
c := newController(klient, logger, opt)
Expand Down Expand Up @@ -305,6 +309,7 @@ type options struct {
scheme string
labelKey string
labelValue string
allowOnlyReadyReplicas bool
scaleTimeout time.Duration
}

Expand Down Expand Up @@ -546,7 +551,7 @@ func (c *controller) sync(ctx context.Context) {
time.Sleep(c.options.scaleTimeout) // Give some time for all replicas before they receive hundreds req/s
}

c.populate(hashrings, statefulsets)
c.populate(ctx, hashrings, statefulsets)

if err := c.saveHashring(ctx, hashrings, cm); err != nil {
c.reconcileErrors.WithLabelValues(save).Inc()
Expand All @@ -565,6 +570,12 @@ func (c controller) waitForPod(ctx context.Context, name string) error {
}
switch pod.Status.Phase {
case corev1.PodRunning:
if c.options.allowOnlyReadyReplicas {
if podutil.IsPodReady(pod) {
return true, nil
}
return false, nil
}
return true, nil
case corev1.PodFailed, corev1.PodPending, corev1.PodSucceeded, corev1.PodUnknown:
return false, nil
Expand All @@ -574,12 +585,28 @@ func (c controller) waitForPod(ctx context.Context, name string) error {
})
}

func (c *controller) populate(hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) {
//nolint:nestif
func (c *controller) populate(ctx context.Context, hashrings []receive.HashringConfig, statefulsets map[string]*appsv1.StatefulSet) {
for i, h := range hashrings {
if sts, exists := statefulsets[h.Hashring]; exists {
var endpoints []string

for i := 0; i < int(*sts.Spec.Replicas); i++ {
// Do not add a replica to the hashring if pod is not Ready.
if c.options.allowOnlyReadyReplicas {
podName := fmt.Sprintf("%s-%d", sts.Name, i)
pod, err := c.klient.CoreV1().Pods(c.options.namespace).Get(ctx, podName, metav1.GetOptions{})

if kerrors.IsNotFound(err) {
continue
}

if !podutil.IsPodReady(pod) {
level.Warn(c.logger).Log("msg", "failed adding pod to hashring, pod not ready", "pod", podName, "err", err)
continue
}
}

// If cluster domain is empty string we don't want dot after svc.
clusterDomain := ""
if c.options.clusterDomain != "" {
Expand Down

0 comments on commit b7f21d0

Please sign in to comment.