Skip to content

Commit

Permalink
Merge pull request #122979 from fatsheep9146/pkg-proxy-support-contex…
Browse files Browse the repository at this point in the history
…tual-logging

Migrate `pkg/proxy` to contextual logging: Part 1
  • Loading branch information
k8s-ci-robot committed Apr 23, 2024
2 parents 2806ffe + be4535b commit 8dd9d1a
Show file tree
Hide file tree
Showing 19 changed files with 542 additions and 420 deletions.
57 changes: 30 additions & 27 deletions cmd/kube-proxy/app/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"context"
"errors"
"os"
"strconv"
Expand All @@ -33,30 +34,30 @@ import (
// https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
type Conntracker interface {
// SetMax adjusts nf_conntrack_max.
SetMax(max int) error
SetMax(ctx context.Context, max int) error
// SetTCPEstablishedTimeout adjusts nf_conntrack_tcp_timeout_established.
SetTCPEstablishedTimeout(seconds int) error
SetTCPEstablishedTimeout(ctx context.Context, seconds int) error
// SetTCPCloseWaitTimeout adjusts nf_conntrack_tcp_timeout_close_wait.
SetTCPCloseWaitTimeout(seconds int) error
SetTCPCloseWaitTimeout(ctx context.Context, seconds int) error
// SetTCPBeLiberal adjusts nf_conntrack_tcp_be_liberal.
SetTCPBeLiberal(value int) error
SetTCPBeLiberal(ctx context.Context, value int) error
// SetUDPTimeout adjusts nf_conntrack_udp_timeout.
SetUDPTimeout(seconds int) error
SetUDPTimeout(ctx context.Context, seconds int) error
// SetUDPStreamTimeout adjusts nf_conntrack_udp_timeout_stream.
SetUDPStreamTimeout(seconds int) error
SetUDPStreamTimeout(ctx context.Context, seconds int) error
}

type realConntracker struct {
logger klog.Logger
}

var errReadOnlySysFS = errors.New("readOnlySysFS")

func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
func (rct realConntracker) SetMax(ctx context.Context, max int) error {
logger := klog.FromContext(ctx)
if err := rct.setIntSysCtl(ctx, "nf_conntrack_max", max); err != nil {
return err
}
rct.logger.Info("Setting nf_conntrack_max", "nfConntrackMax", max)
logger.Info("Setting nf_conntrack_max", "nfConntrackMax", max)

// Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
// when the writer process is not in the initial network namespace
Expand All @@ -79,44 +80,45 @@ func (rct realConntracker) SetMax(max int) error {
// don't set conntrack hashsize and return a special error
// errReadOnlySysFS here. The caller should deal with
// errReadOnlySysFS differently.
writable, err := rct.isSysFSWritable()
writable, err := rct.isSysFSWritable(ctx)
if err != nil {
return err
}
if !writable {
return errReadOnlySysFS
}
// TODO: generify this and sysctl to a new sysfs.WriteInt()
rct.logger.Info("Setting conntrack hashsize", "conntrackHashsize", max/4)
logger.Info("Setting conntrack hashsize", "conntrackHashsize", max/4)
return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
}

func (rct realConntracker) SetTCPEstablishedTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_established", seconds)
func (rct realConntracker) SetTCPEstablishedTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_timeout_established", seconds)
}

func (rct realConntracker) SetTCPCloseWaitTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_close_wait", seconds)
func (rct realConntracker) SetTCPCloseWaitTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_timeout_close_wait", seconds)
}

func (rct realConntracker) SetTCPBeLiberal(value int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_be_liberal", value)
func (rct realConntracker) SetTCPBeLiberal(ctx context.Context, value int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_be_liberal", value)
}

func (rct realConntracker) SetUDPTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_udp_timeout", seconds)
func (rct realConntracker) SetUDPTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_udp_timeout", seconds)
}

func (rct realConntracker) SetUDPStreamTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_udp_timeout_stream", seconds)
func (rct realConntracker) SetUDPStreamTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_udp_timeout_stream", seconds)
}

func (rct realConntracker) setIntSysCtl(name string, value int) error {
func (rct realConntracker) setIntSysCtl(ctx context.Context, name string, value int) error {
logger := klog.FromContext(ctx)
entry := "net/netfilter/" + name

sys := sysctl.New()
if val, _ := sys.GetSysctl(entry); val != value {
rct.logger.Info("Set sysctl", "entry", entry, "value", value)
logger.Info("Set sysctl", "entry", entry, "value", value)
if err := sys.SetSysctl(entry, value); err != nil {
return err
}
Expand All @@ -125,13 +127,14 @@ func (rct realConntracker) setIntSysCtl(name string, value int) error {
}

// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
func (rct realConntracker) isSysFSWritable() (bool, error) {
func (rct realConntracker) isSysFSWritable(ctx context.Context) (bool, error) {
logger := klog.FromContext(ctx)
const permWritable = "rw"
const sysfsDevice = "sysfs"
m := mount.New("" /* default mount path */)
mountPoints, err := m.List()
if err != nil {
rct.logger.Error(err, "Failed to list mount points")
logger.Error(err, "Failed to list mount points")
return false, err
}

Expand All @@ -143,7 +146,7 @@ func (rct realConntracker) isSysFSWritable() (bool, error) {
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil
}
rct.logger.Error(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts)
logger.Error(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts)
return false, errReadOnlySysFS
}

Expand Down
76 changes: 40 additions & 36 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func init() {

// proxyRun defines the interface to run a specified ProxyServer
type proxyRun interface {
Run() error
Run(ctx context.Context) error
}

// Options contains everything necessary to create and run a proxy server.
Expand Down Expand Up @@ -371,20 +371,20 @@ func (o *Options) Validate() error {
}

// Run runs the specified ProxyServer.
func (o *Options) Run() error {
func (o *Options) Run(ctx context.Context) error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}

err := platformCleanup(o.config.Mode, o.CleanupAndExit)
err := platformCleanup(ctx, o.config.Mode, o.CleanupAndExit)
if o.CleanupAndExit {
return err
}
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
// logged messages if they failed in interesting ways.

proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
proxyServer, err := newProxyServer(ctx, o.config, o.master, o.InitAndExit)
if err != nil {
return err
}
Expand All @@ -393,19 +393,19 @@ func (o *Options) Run() error {
}

o.proxyServer = proxyServer
return o.runLoop()
return o.runLoop(ctx)
}

// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
func (o *Options) runLoop(ctx context.Context) error {
if o.watcher != nil {
o.watcher.Run()
}

// run the proxy in goroutine
go func() {
err := o.proxyServer.Run()
err := o.proxyServer.Run(ctx)
o.errCh <- err
}()

Expand Down Expand Up @@ -554,7 +554,7 @@ with the apiserver API to configure the proxy.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
if err := opts.Run(); err != nil {
if err := opts.Run(context.Background()); err != nil {
opts.logger.Error(err, "Error running ProxyServer")
return err
}
Expand Down Expand Up @@ -597,15 +597,14 @@ type ProxyServer struct {
podCIDRs []string // only used for LocalModeNodeCIDR

Proxier proxy.Provider

logger klog.Logger
}

// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
logger := klog.FromContext(ctx)

s := &ProxyServer{
Config: config,
logger: logger,
}

cz, err := configz.New(kubeproxyconfig.GroupName)
Expand All @@ -623,13 +622,13 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
return nil, err
}

s.Client, err = createClient(logger, config.ClientConnection, master)
s.Client, err = createClient(ctx, config.ClientConnection, master)
if err != nil {
return nil, err
}

rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)
rawNodeIPs := getNodeIPs(ctx, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(ctx, rawNodeIPs, config.BindAddress)

if len(config.NodePortAddresses) == 1 && config.NodePortAddresses[0] == kubeproxyconfig.NodePortAddressesPrimary {
var nodePortAddresses []string
Expand All @@ -656,7 +655,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
}

err = s.platformSetup()
err = s.platformSetup(ctx)
if err != nil {
return nil, err
}
Expand All @@ -666,7 +665,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
}

ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported()
ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported(ctx)
if err != nil {
return nil, err
} else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) {
Expand All @@ -685,7 +684,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
}

s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
s.Proxier, err = s.createProxier(ctx, config, dualStackSupported, initOnly)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -816,7 +815,8 @@ func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {

// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
func createClient(ctx context.Context, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
logger := klog.FromContext(ctx)
var kubeConfig *rest.Config
var err error

Expand Down Expand Up @@ -847,7 +847,8 @@ func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectio
return client, nil
}

func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh chan error) {
func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errCh chan error) {
logger := klog.FromContext(ctx)
if hz == nil {
return
}
Expand All @@ -866,7 +867,7 @@ func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh
logger.Error(nil, "Healthz server returned without error")
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
go wait.Until(fn, 5*time.Second, ctx.Done())
}

func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
Expand Down Expand Up @@ -911,18 +912,19 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl

// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error {
func (s *ProxyServer) Run(ctx context.Context) error {
logger := klog.FromContext(ctx)
// To help debugging, immediately log version
s.logger.Info("Version info", "version", version.Get())
logger.Info("Version info", "version", version.Get())

s.logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.Config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
s.logger.V(2).Info("Failed to apply OOMScore", "err", err)
logger.V(2).Info("Failed to apply OOMScore", "err", err)
}
}

Expand All @@ -940,7 +942,7 @@ func (s *ProxyServer) Run() error {
}

// Start up a healthz server if requested
serveHealthz(s.logger, s.HealthzServer, healthzErrCh)
serveHealthz(ctx, s.HealthzServer, healthzErrCh)

// Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
Expand Down Expand Up @@ -968,16 +970,16 @@ func (s *ProxyServer) Run() error {
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig := config.NewServiceConfig(ctx, informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
go serviceConfig.Run(ctx.Done())

endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig := config.NewEndpointSliceConfig(ctx, informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
go endpointSliceConfig.Run(ctx.Done())

if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig := config.NewServiceCIDRConfig(ctx, informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig.RegisterEventHandler(s.Proxier)
go serviceCIDRConfig.Run(wait.NeverStop)
}
Expand All @@ -990,10 +992,10 @@ func (s *ProxyServer) Run() error {
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
nodeConfig := config.NewNodeConfig(ctx, currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
// https://issues.k8s.io/111321
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(ctx, s.podCIDRs))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
Expand Down Expand Up @@ -1039,7 +1041,8 @@ func (s *ProxyServer) birthCry() {
// 1. if bindAddress is not 0.0.0.0 or ::, then it is used as the primary IP.
// 2. if rawNodeIPs is not empty, then its address(es) is/are used
// 3. otherwise the node IPs are 127.0.0.1 and ::1
func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
func detectNodeIPs(ctx context.Context, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
logger := klog.FromContext(ctx)
primaryFamily := v1.IPv4Protocol
nodeIPs := map[v1.IPFamily]net.IP{
v1.IPv4Protocol: net.IPv4(127, 0, 0, 1),
Expand Down Expand Up @@ -1080,7 +1083,8 @@ func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string)

// getNodeIP returns IPs for the node with the provided name. If
// required, it will wait for the node to be created.
func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []net.IP {
func getNodeIPs(ctx context.Context, client clientset.Interface, name string) []net.IP {
logger := klog.FromContext(ctx)
var nodeIPs []net.IP
backoff := wait.Backoff{
Steps: 6,
Expand All @@ -1090,7 +1094,7 @@ func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []n
}

err := wait.ExponentialBackoff(backoff, func() (bool, error) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
node, err := client.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
logger.Error(err, "Failed to retrieve node info")
return false, nil
Expand Down

0 comments on commit 8dd9d1a

Please sign in to comment.