From 070322921d35c781bd0c94a6527dd2a819362210 Mon Sep 17 00:00:00 2001 From: "Lubomir I. Ivanov" Date: Tue, 23 Feb 2021 03:58:09 +0200 Subject: [PATCH] pkg/kubelet: improve the node informer sync check GetNode() is called in a lot of places including a hot loop in fastStatusUpdateOnce. Having a poll in it is delaying the kubelet /readyz status=200 report. If a client is available attempt to wait for the sync to happen, before starting the list watch for pods at the apiserver. --- pkg/kubelet/config/apiserver.go | 27 +++++++++++++-- pkg/kubelet/kubelet.go | 59 +++++++++++++++------------------ pkg/kubelet/kubelet_getters.go | 13 +------- 3 files changed, 52 insertions(+), 47 deletions(-) diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index 2f29ddd576ff..594251e9450b 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -17,21 +17,42 @@ limitations under the License. package config import ( - "k8s.io/api/core/v1" + "time" + + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "k8s.io/klog" api "k8s.io/kubernetes/pkg/apis/core" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +// WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync +const WaitForAPIServerSyncPeriod = 1 * time.Second + // NewSourceApiserver creates a config source that watches and pulls from the apiserver. -func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) { +func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) { lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName))) - newSourceApiserverFromLW(lw, updates) + + // The Reflector responsible for watching pods at the apiserver should be run only after + // the node sync with the apiserver has completed. + klog.Info("Waiting for node sync before watching apiserver pods") + go func() { + for { + if nodeHasSynced() { + klog.V(4).Info("node sync completed") + break + } + time.Sleep(WaitForAPIServerSyncPeriod) + klog.V(4).Info("node sync has not completed yet") + } + klog.Info("Watching apiserver") + newSourceApiserverFromLW(lw, updates) + }() } // newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d134d8d03e1e..1d571c33b077 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -128,9 +128,6 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 30 * time.Second - // Max amount of time to wait for node list/watch to initially sync - maxWaitForAPIServerSync = 10 * time.Second - // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 @@ -273,7 +270,7 @@ type Dependencies struct { // makePodSourceConfig creates a config.PodConfig from the given // KubeletConfiguration or returns an error. -func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) { +func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string, nodeHasSynced func() bool) (*config.PodConfig, error) { manifestURLHeader := make(http.Header) if len(kubeCfg.StaticPodURLHeader) > 0 { for k, v := range kubeCfg.StaticPodURLHeader { @@ -313,11 +310,11 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku } if kubeDeps.KubeClient != nil { - klog.Infof("Watching apiserver") + klog.Info("Adding apiserver pod source") if updatechannel == nil { updatechannel = cfg.Channel(kubetypes.ApiserverSource) } - config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel) + config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, updatechannel) } return cfg, nil } @@ -469,9 +466,32 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName) } + var nodeHasSynced cache.InformerSynced + var nodeLister corelisters.NodeLister + + // If kubeClient == nil, we are running in standalone mode (i.e. no API servers) + // If not nil, we are running as part of a cluster and should sync w/API + if kubeDeps.KubeClient != nil { + kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String() + })) + nodeLister = kubeInformers.Core().V1().Nodes().Lister() + nodeHasSynced = func() bool { + return kubeInformers.Core().V1().Nodes().Informer().HasSynced() + } + kubeInformers.Start(wait.NeverStop) + klog.Info("Attempting to sync node with API server") + } else { + // we don't have a client to sync! + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + nodeLister = corelisters.NewNodeLister(nodeIndexer) + nodeHasSynced = func() bool { return true } + klog.Info("Kubelet is running in standalone mode, will skip API server sync") + } + if kubeDeps.PodConfig == nil { var err error - kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath) + kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath, nodeHasSynced) if err != nil { return nil, err } @@ -518,31 +538,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, } serviceLister := corelisters.NewServiceLister(serviceIndexer) - var nodeHasSynced cache.InformerSynced - var nodeLister corelisters.NodeLister - - if kubeDeps.KubeClient != nil { - kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String() - })) - nodeLister = kubeInformers.Core().V1().Nodes().Lister() - nodeHasSynced = func() bool { - if kubeInformers.Core().V1().Nodes().Informer().HasSynced() { - return true - } - klog.Infof("kubelet nodes not sync") - return false - } - kubeInformers.Start(wait.NeverStop) - klog.Info("Kubelet client is not nil") - } else { - // we dont have a client to sync! - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - nodeLister = corelisters.NewNodeLister(nodeIndexer) - nodeHasSynced = func() bool { return true } - klog.Info("Kubelet client is nil") - } - // TODO: get the real node object of ourself, // and use the real node name and UID. // TODO: what is namespace for node? diff --git a/pkg/kubelet/kubelet_getters.go b/pkg/kubelet/kubelet_getters.go index 0c4c1d31c374..8c9714995368 100644 --- a/pkg/kubelet/kubelet_getters.go +++ b/pkg/kubelet/kubelet_getters.go @@ -22,7 +22,6 @@ import ( "io/ioutil" "net" "path/filepath" - "time" cadvisorapiv1 "github.com/google/cadvisor/info/v1" "k8s.io/klog" @@ -31,7 +30,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -234,15 +232,6 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { if kl.kubeClient == nil { return kl.initialNode(context.TODO()) } - // if we have a valid kube client, we wait for initial lister to sync - if !kl.nodeHasSynced() { - err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) { - return kl.nodeHasSynced(), nil - }) - if err != nil { - return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object") - } - } return kl.nodeLister.Get(string(kl.nodeName)) } @@ -253,7 +242,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) { // zero capacity, and the default labels. func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) { if kl.kubeClient != nil { - if n, err := kl.GetNode(); err == nil { + if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil { return n, nil } }