Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #99336: pkg/kubelet: improve the node informer sync check #101343

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 24 additions & 3 deletions pkg/kubelet/config/apiserver.go
Expand Up @@ -17,21 +17,42 @@ limitations under the License.
package config

import (
"k8s.io/api/core/v1"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
api "k8s.io/kubernetes/pkg/apis/core"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)

// WaitForAPIServerSyncPeriod is the period between checks for the node list/watch initial sync
const WaitForAPIServerSyncPeriod = 1 * time.Second

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)

// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.Info("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).Info("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).Info("node sync has not completed yet")
}
klog.Info("Watching apiserver")
newSourceApiserverFromLW(lw, updates)
}()
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
Expand Down
59 changes: 27 additions & 32 deletions pkg/kubelet/kubelet.go
Expand Up @@ -128,9 +128,6 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 30 * time.Second

// Max amount of time to wait for node list/watch to initially sync
maxWaitForAPIServerSync = 10 * time.Second

// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5

Expand Down Expand Up @@ -273,7 +270,7 @@ type Dependencies struct {

// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string) (*config.PodConfig, error) {
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, bootstrapCheckpointPath string, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
Expand Down Expand Up @@ -313,11 +310,11 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
}

if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
klog.Info("Adding apiserver pod source")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, updatechannel)
}
return cfg, nil
}
Expand Down Expand Up @@ -469,9 +466,32 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}

var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister

// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.Info("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.Info("Kubelet is running in standalone mode, will skip API server sync")
}

if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath, nodeHasSynced)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,31 +538,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)

var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister

if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{api.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
if kubeInformers.Core().V1().Nodes().Informer().HasSynced() {
return true
}
klog.Infof("kubelet nodes not sync")
return false
}
kubeInformers.Start(wait.NeverStop)
klog.Info("Kubelet client is not nil")
} else {
// we dont have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.Info("Kubelet client is nil")
}

// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
Expand Down
13 changes: 1 addition & 12 deletions pkg/kubelet/kubelet_getters.go
Expand Up @@ -22,7 +22,6 @@ import (
"io/ioutil"
"net"
"path/filepath"
"time"

cadvisorapiv1 "github.com/google/cadvisor/info/v1"
"k8s.io/klog"
Expand All @@ -31,7 +30,6 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -234,15 +232,6 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
if kl.kubeClient == nil {
return kl.initialNode(context.TODO())
}
// if we have a valid kube client, we wait for initial lister to sync
if !kl.nodeHasSynced() {
err := wait.PollImmediate(time.Second, maxWaitForAPIServerSync, func() (bool, error) {
return kl.nodeHasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("nodes have not yet been read at least once, cannot construct node object")
}
}
return kl.nodeLister.Get(string(kl.nodeName))
}

Expand All @@ -253,7 +242,7 @@ func (kl *Kubelet) GetNode() (*v1.Node, error) {
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*v1.Node, error) {
if kl.kubeClient != nil {
if n, err := kl.GetNode(); err == nil {
if n, err := kl.nodeLister.Get(string(kl.nodeName)); err == nil {
return n, nil
}
}
Expand Down