Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate pkg/proxy to contextual logging: Part 1 #122979

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 30 additions & 27 deletions cmd/kube-proxy/app/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"context"
"errors"
"os"
"strconv"
Expand All @@ -33,30 +34,30 @@ import (
// https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
type Conntracker interface {
// SetMax adjusts nf_conntrack_max.
SetMax(max int) error
SetMax(ctx context.Context, max int) error
// SetTCPEstablishedTimeout adjusts nf_conntrack_tcp_timeout_established.
SetTCPEstablishedTimeout(seconds int) error
SetTCPEstablishedTimeout(ctx context.Context, seconds int) error
// SetTCPCloseWaitTimeout adjusts nf_conntrack_tcp_timeout_close_wait.
SetTCPCloseWaitTimeout(seconds int) error
SetTCPCloseWaitTimeout(ctx context.Context, seconds int) error
// SetTCPBeLiberal adjusts nf_conntrack_tcp_be_liberal.
SetTCPBeLiberal(value int) error
SetTCPBeLiberal(ctx context.Context, value int) error
// SetUDPTimeout adjusts nf_conntrack_udp_timeout.
SetUDPTimeout(seconds int) error
SetUDPTimeout(ctx context.Context, seconds int) error
// SetUDPStreamTimeout adjusts nf_conntrack_udp_timeout_stream.
SetUDPStreamTimeout(seconds int) error
SetUDPStreamTimeout(ctx context.Context, seconds int) error
}

type realConntracker struct {
logger klog.Logger
}

var errReadOnlySysFS = errors.New("readOnlySysFS")

func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
func (rct realConntracker) SetMax(ctx context.Context, max int) error {
logger := klog.FromContext(ctx)
if err := rct.setIntSysCtl(ctx, "nf_conntrack_max", max); err != nil {
return err
}
rct.logger.Info("Setting nf_conntrack_max", "nfConntrackMax", max)
logger.Info("Setting nf_conntrack_max", "nfConntrackMax", max)

// Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
// when the writer process is not in the initial network namespace
Expand All @@ -79,44 +80,45 @@ func (rct realConntracker) SetMax(max int) error {
// don't set conntrack hashsize and return a special error
// errReadOnlySysFS here. The caller should deal with
// errReadOnlySysFS differently.
writable, err := rct.isSysFSWritable()
writable, err := rct.isSysFSWritable(ctx)
if err != nil {
return err
}
if !writable {
return errReadOnlySysFS
}
// TODO: generify this and sysctl to a new sysfs.WriteInt()
rct.logger.Info("Setting conntrack hashsize", "conntrackHashsize", max/4)
logger.Info("Setting conntrack hashsize", "conntrackHashsize", max/4)
return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
}

func (rct realConntracker) SetTCPEstablishedTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_established", seconds)
func (rct realConntracker) SetTCPEstablishedTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_timeout_established", seconds)
}

func (rct realConntracker) SetTCPCloseWaitTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_close_wait", seconds)
func (rct realConntracker) SetTCPCloseWaitTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_timeout_close_wait", seconds)
}

func (rct realConntracker) SetTCPBeLiberal(value int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_be_liberal", value)
func (rct realConntracker) SetTCPBeLiberal(ctx context.Context, value int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_tcp_be_liberal", value)
}

func (rct realConntracker) SetUDPTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_udp_timeout", seconds)
func (rct realConntracker) SetUDPTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_udp_timeout", seconds)
}

func (rct realConntracker) SetUDPStreamTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_udp_timeout_stream", seconds)
func (rct realConntracker) SetUDPStreamTimeout(ctx context.Context, seconds int) error {
return rct.setIntSysCtl(ctx, "nf_conntrack_udp_timeout_stream", seconds)
}

func (rct realConntracker) setIntSysCtl(name string, value int) error {
func (rct realConntracker) setIntSysCtl(ctx context.Context, name string, value int) error {
logger := klog.FromContext(ctx)
entry := "net/netfilter/" + name

sys := sysctl.New()
if val, _ := sys.GetSysctl(entry); val != value {
rct.logger.Info("Set sysctl", "entry", entry, "value", value)
logger.Info("Set sysctl", "entry", entry, "value", value)
if err := sys.SetSysctl(entry, value); err != nil {
return err
}
Expand All @@ -125,13 +127,14 @@ func (rct realConntracker) setIntSysCtl(name string, value int) error {
}

// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
func (rct realConntracker) isSysFSWritable() (bool, error) {
func (rct realConntracker) isSysFSWritable(ctx context.Context) (bool, error) {
logger := klog.FromContext(ctx)
const permWritable = "rw"
const sysfsDevice = "sysfs"
m := mount.New("" /* default mount path */)
mountPoints, err := m.List()
if err != nil {
rct.logger.Error(err, "Failed to list mount points")
logger.Error(err, "Failed to list mount points")
return false, err
}

Expand All @@ -143,7 +146,7 @@ func (rct realConntracker) isSysFSWritable() (bool, error) {
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil
}
rct.logger.Error(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts)
logger.Error(nil, "Sysfs is not writable", "mountPoint", mountPoint, "mountOptions", mountPoint.Opts)
return false, errReadOnlySysFS
}

Expand Down
76 changes: 40 additions & 36 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func init() {

// proxyRun defines the interface to run a specified ProxyServer
type proxyRun interface {
Run() error
Run(ctx context.Context) error
}

// Options contains everything necessary to create and run a proxy server.
Expand Down Expand Up @@ -371,20 +371,20 @@ func (o *Options) Validate() error {
}

// Run runs the specified ProxyServer.
func (o *Options) Run() error {
func (o *Options) Run(ctx context.Context) error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}

err := platformCleanup(o.config.Mode, o.CleanupAndExit)
err := platformCleanup(ctx, o.config.Mode, o.CleanupAndExit)
if o.CleanupAndExit {
return err
}
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
// logged messages if they failed in interesting ways.

proxyServer, err := newProxyServer(o.logger, o.config, o.master, o.InitAndExit)
proxyServer, err := newProxyServer(ctx, o.config, o.master, o.InitAndExit)
if err != nil {
return err
}
Expand All @@ -393,19 +393,19 @@ func (o *Options) Run() error {
}

o.proxyServer = proxyServer
return o.runLoop()
return o.runLoop(ctx)
}

// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
func (o *Options) runLoop(ctx context.Context) error {
if o.watcher != nil {
o.watcher.Run()
}

// run the proxy in goroutine
go func() {
err := o.proxyServer.Run()
err := o.proxyServer.Run(ctx)
o.errCh <- err
}()

Expand Down Expand Up @@ -554,7 +554,7 @@ with the apiserver API to configure the proxy.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
if err := opts.Run(); err != nil {
if err := opts.Run(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment unrelated to context logging: it might be good to catch termination signals in the context. It would make a more graceful termination possible, but then again, it might break something @danwinship ?

        ctx, _ := signal.NotifyContext(
                context.Background(), syscall.SIGINT, syscall.SIGTERM)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the big constraint for exiting is that, ideally, we don't want to exit in the middle of syncProxyRules, because then we might fail to do some conntrack cleanup, etc. So if we get INT/TERM, and we aren't inside syncProxyRules, then we can exit right away, but if we are inside syncProxyRules, we should finish and then exit.

I don't know what the right way to implement that is.

Copy link
Contributor

@uablrek uablrek Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO context.Background() here is a bad option. Subsequent calls to klog.FromContext will use the fallback (global) logger, and that may not be what we want. In the future we may want the opportunity to use some customized logger in kube-proxy, whithout having to change the global logger. I think there should be a "root context" from which all sub-contexts are derived, and as a consequence, a "root logger". @pohly, please comment on this.

I propose that newOptions() takes a logger as parameter (and is renamed NewOptions->newOptions):

func NewProxyCommand() *cobra.Command {
	logger := klog.LoggerWithName(klog.NewKlogr(), "kube-proxy")
	opts := newOptions(logger)

Then call "Run()" with a context with this logger set:

			ctx := klog.NewContext(context.Background(), logger)
			if err := opts.Run(ctx); err != nil {
				logger.Error(err, "Error running ProxyServer")
				return err
			}

I tested this, and in a perfect world, all log-entries should have "logger": "kube-proxy". They don't.

opts.logger.Error(err, "Error running ProxyServer")
return err
}
Expand Down Expand Up @@ -597,15 +597,14 @@ type ProxyServer struct {
podCIDRs []string // only used for LocalModeNodeCIDR

Proxier proxy.Provider

logger klog.Logger
}

// newProxyServer creates a ProxyServer based on the given config
func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfiguration, master string, initOnly bool) (*ProxyServer, error) {
logger := klog.FromContext(ctx)

s := &ProxyServer{
Config: config,
logger: logger,
}

cz, err := configz.New(kubeproxyconfig.GroupName)
Expand All @@ -623,13 +622,13 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
return nil, err
}

s.Client, err = createClient(logger, config.ClientConnection, master)
s.Client, err = createClient(ctx, config.ClientConnection, master)
if err != nil {
return nil, err
}

rawNodeIPs := getNodeIPs(logger, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(logger, rawNodeIPs, config.BindAddress)
rawNodeIPs := getNodeIPs(ctx, s.Client, s.Hostname)
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(ctx, rawNodeIPs, config.BindAddress)

if len(config.NodePortAddresses) == 1 && config.NodePortAddresses[0] == kubeproxyconfig.NodePortAddressesPrimary {
var nodePortAddresses []string
Expand All @@ -656,7 +655,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
s.HealthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
}

err = s.platformSetup()
err = s.platformSetup(ctx)
if err != nil {
return nil, err
}
Expand All @@ -666,7 +665,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
}

ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported()
ipv4Supported, ipv6Supported, dualStackSupported, err := s.platformCheckSupported(ctx)
if err != nil {
return nil, err
} else if (s.PrimaryIPFamily == v1.IPv4Protocol && !ipv4Supported) || (s.PrimaryIPFamily == v1.IPv6Protocol && !ipv6Supported) {
Expand All @@ -685,7 +684,7 @@ func newProxyServer(logger klog.Logger, config *kubeproxyconfig.KubeProxyConfigu
logger.Error(err, "Kube-proxy configuration may be incomplete or incorrect")
}

s.Proxier, err = s.createProxier(config, dualStackSupported, initOnly)
s.Proxier, err = s.createProxier(ctx, config, dualStackSupported, initOnly)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -816,7 +815,8 @@ func badBindAddress(bindAddress string, wrongFamily netutils.IPFamily) bool {

// createClient creates a kube client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
func createClient(ctx context.Context, config componentbaseconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, error) {
logger := klog.FromContext(ctx)
var kubeConfig *rest.Config
var err error

Expand Down Expand Up @@ -847,7 +847,8 @@ func createClient(logger klog.Logger, config componentbaseconfig.ClientConnectio
return client, nil
}

func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh chan error) {
func serveHealthz(ctx context.Context, hz *healthcheck.ProxierHealthServer, errCh chan error) {
logger := klog.FromContext(ctx)
if hz == nil {
return
}
Expand All @@ -866,7 +867,7 @@ func serveHealthz(logger klog.Logger, hz *healthcheck.ProxierHealthServer, errCh
logger.Error(nil, "Healthz server returned without error")
}
}
go wait.Until(fn, 5*time.Second, wait.NeverStop)
go wait.Until(fn, 5*time.Second, ctx.Done())
fatsheep9146 marked this conversation as resolved.
Show resolved Hide resolved
}

func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enableProfiling bool, errCh chan error) {
Expand Down Expand Up @@ -911,18 +912,19 @@ func serveMetrics(bindAddress string, proxyMode kubeproxyconfig.ProxyMode, enabl

// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error {
func (s *ProxyServer) Run(ctx context.Context) error {
logger := klog.FromContext(ctx)
// To help debugging, immediately log version
s.logger.Info("Version info", "version", version.Get())
logger.Info("Version info", "version", version.Get())

s.logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.Config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
s.logger.V(2).Info("Failed to apply OOMScore", "err", err)
logger.V(2).Info("Failed to apply OOMScore", "err", err)
}
}

Expand All @@ -940,7 +942,7 @@ func (s *ProxyServer) Run() error {
}

// Start up a healthz server if requested
serveHealthz(s.logger, s.HealthzServer, healthzErrCh)
serveHealthz(ctx, s.HealthzServer, healthzErrCh)

// Start up a metrics server if requested
serveMetrics(s.Config.MetricsBindAddress, s.Config.Mode, s.Config.EnableProfiling, metricsErrCh)
Expand Down Expand Up @@ -968,16 +970,16 @@ func (s *ProxyServer) Run() error {
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig := config.NewServiceConfig(ctx, informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
go serviceConfig.Run(ctx.Done())

endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig := config.NewEndpointSliceConfig(ctx, informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
go endpointSliceConfig.Run(ctx.Done())

if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig := config.NewServiceCIDRConfig(ctx, informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig.RegisterEventHandler(s.Proxier)
go serviceCIDRConfig.Run(wait.NeverStop)
}
Expand All @@ -990,10 +992,10 @@ func (s *ProxyServer) Run() error {
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
}))
nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
nodeConfig := config.NewNodeConfig(ctx, currentNodeInformerFactory.Core().V1().Nodes(), s.Config.ConfigSyncPeriod.Duration)
// https://issues.k8s.io/111321
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(s.podCIDRs))
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(ctx, s.podCIDRs))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeProxyDrainingTerminatingNodes) {
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
Expand Down Expand Up @@ -1039,7 +1041,8 @@ func (s *ProxyServer) birthCry() {
// 1. if bindAddress is not 0.0.0.0 or ::, then it is used as the primary IP.
// 2. if rawNodeIPs is not empty, then its address(es) is/are used
// 3. otherwise the node IPs are 127.0.0.1 and ::1
func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
func detectNodeIPs(ctx context.Context, rawNodeIPs []net.IP, bindAddress string) (v1.IPFamily, map[v1.IPFamily]net.IP) {
logger := klog.FromContext(ctx)
primaryFamily := v1.IPv4Protocol
nodeIPs := map[v1.IPFamily]net.IP{
v1.IPv4Protocol: net.IPv4(127, 0, 0, 1),
Expand Down Expand Up @@ -1080,7 +1083,8 @@ func detectNodeIPs(logger klog.Logger, rawNodeIPs []net.IP, bindAddress string)

// getNodeIP returns IPs for the node with the provided name. If
// required, it will wait for the node to be created.
func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []net.IP {
func getNodeIPs(ctx context.Context, client clientset.Interface, name string) []net.IP {
logger := klog.FromContext(ctx)
var nodeIPs []net.IP
backoff := wait.Backoff{
Steps: 6,
Expand All @@ -1090,7 +1094,7 @@ func getNodeIPs(logger klog.Logger, client clientset.Interface, name string) []n
}

err := wait.ExponentialBackoff(backoff, func() (bool, error) {
node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
node, err := client.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
logger.Error(err, "Failed to retrieve node info")
return false, nil
Expand Down