Skip to content

Commit

Permalink
convert pkg/proxy/iptables to contextual logging
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Jan 27, 2024
1 parent 818c8d6 commit 4178a58
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ func (s *ProxyServer) Run() error {
go endpointSliceConfig.Run(wait.NeverStop)

if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
serviceCIDRConfig := config.NewServiceCIDRConfig(informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig := config.NewServiceCIDRConfig(s.logger, informerFactory.Networking().V1alpha1().ServiceCIDRs(), s.Config.ConfigSyncPeriod.Duration)
serviceCIDRConfig.RegisterEventHandler(s.Proxier)
go serviceCIDRConfig.Run(wait.NeverStop)
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-proxy/app/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio

// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewDualStackProxier(
s.logger,
ipt,
utilsysctl.New(),
exec.New(),
Expand All @@ -203,6 +204,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio

// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewProxier(
s.logger,
s.PrimaryIPFamily,
iptInterface,
utilsysctl.New(),
Expand Down
73 changes: 39 additions & 34 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ type Proxier struct {
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer proxyutil.NetworkInterfacer

logger klog.Logger
}

// Proxier implements proxy.Provider
Expand All @@ -220,7 +222,8 @@ var _ proxy.Provider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipFamily v1.IPFamily,
func NewProxier(logger klog.Logger,
ipFamily v1.IPFamily,
ipt utiliptables.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
Expand All @@ -245,7 +248,7 @@ func NewProxier(ipFamily v1.IPFamily,
if localhostNodePorts {
// Set the route_localnet sysctl we need for exposing NodePorts on loopback addresses
// Refer to https://issues.k8s.io/90259
klog.InfoS("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
logger.Info("Setting route_localnet=1 to allow node-ports on localhost; to change this either disable iptables.localhostNodePorts (--iptables-localhost-nodeports) or set nodePortAddresses (--nodeport-addresses) to filter loopback addresses")
if err := proxyutil.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
return nil, err
}
Expand All @@ -257,18 +260,18 @@ func NewProxier(ipFamily v1.IPFamily,
conntrackTCPLiberal := false
if val, err := sysctl.GetSysctl(sysctlNFConntrackTCPBeLiberal); err == nil && val != 0 {
conntrackTCPLiberal = true
klog.InfoS("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
logger.Info("nf_conntrack_tcp_be_liberal set, not installing DROP rules for INVALID packets")
}

if initOnly {
klog.InfoS("System initialized and --init-only specified")
logger.Info("System initialized and --init-only specified")
return nil, nil
}

// Generate the masquerade mark to use for SNAT rules.
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
logger.V(2).Info("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)

serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)

Expand Down Expand Up @@ -301,10 +304,11 @@ func NewProxier(ipFamily v1.IPFamily,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
logger: logger,
}

burstSyncs := 2
klog.V(2).InfoS("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
logger.V(2).Info("Iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
Expand All @@ -314,16 +318,17 @@ func NewProxier(ipFamily v1.IPFamily,
proxier.forceSyncProxyRules, syncPeriod, wait.NeverStop)

if ipt.HasRandomFully() {
klog.V(2).InfoS("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
logger.V(2).Info("Iptables supports --random-fully", "ipFamily", ipt.Protocol())
} else {
klog.V(2).InfoS("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
logger.V(2).Info("Iptables does not support --random-fully", "ipFamily", ipt.Protocol())
}

return proxier, nil
}

// NewDualStackProxier creates a MetaProxier instance, with IPv4 and IPv6 proxies.
func NewDualStackProxier(
logger klog.Logger,
ipt [2]utiliptables.Interface,
sysctl utilsysctl.Interface,
exec utilexec.Interface,
Expand All @@ -341,14 +346,14 @@ func NewDualStackProxier(
initOnly bool,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(v1.IPv4Protocol, ipt[0], sysctl,
ipv4Proxier, err := NewProxier(logger, v1.IPv4Protocol, ipt[0], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, localhostNodePorts, masqueradeBit, localDetectors[0], hostname,
nodeIPs[v1.IPv4Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err)
}

ipv6Proxier, err := NewProxier(v1.IPv6Protocol, ipt[1], sysctl,
ipv6Proxier, err := NewProxier(logger, v1.IPv6Protocol, ipt[1], sysctl,
exec, syncPeriod, minSyncPeriod, masqueradeAll, false, masqueradeBit, localDetectors[1], hostname,
nodeIPs[v1.IPv6Protocol], recorder, healthzServer, nodePortAddresses, initOnly)
if err != nil {
Expand Down Expand Up @@ -605,7 +610,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
Expand All @@ -621,7 +626,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)

proxier.Sync()
}
Expand All @@ -630,7 +635,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
Expand All @@ -646,7 +651,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
}
proxier.needFullSync = true
proxier.mu.Unlock()
klog.V(4).InfoS("Updated proxier node labels", "labels", node.Labels)
proxier.logger.V(4).Info("Updated proxier node labels", "labels", node.Labels)

proxier.Sync()
}
Expand All @@ -655,7 +660,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
proxier.logger.Error(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
Expand Down Expand Up @@ -779,7 +784,7 @@ func (proxier *Proxier) syncProxyRules() {

// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
proxier.logger.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

Expand All @@ -796,18 +801,18 @@ func (proxier *Proxier) syncProxyRules() {
} else {
metrics.SyncFullProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
}
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
proxier.logger.V(2).Info("SyncProxyRules complete", "elapsed", time.Since(start))
}()

serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

klog.V(2).InfoS("Syncing iptables rules")
proxier.logger.V(2).Info("Syncing iptables rules")

success := false
defer func() {
if !success {
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.logger.Info("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
if tryPartialSync {
metrics.IptablesPartialRestoreFailuresTotal.Inc()
Expand All @@ -833,7 +838,7 @@ func (proxier *Proxier) syncProxyRules() {
// (which will be very slow on hosts with lots of iptables rules).
for _, jump := range append(iptablesJumpChains, iptablesKubeletJumpChains...) {
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
proxier.logger.Error(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
return
}
args := jump.extraArgs
Expand All @@ -842,7 +847,7 @@ func (proxier *Proxier) syncProxyRules() {
}
args = append(args, "-j", string(jump.dstChain))
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
proxier.logger.Error(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
return
}
}
Expand Down Expand Up @@ -952,7 +957,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.svcPortMap {
svcInfo, ok := svc.(*servicePortInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
proxier.logger.Error(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
Expand Down Expand Up @@ -1345,7 +1350,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
proxier.logger.Error(nil, "Failed to cast endpointInfo", "endpointInfo", ep)
continue
}

Expand Down Expand Up @@ -1396,7 +1401,7 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.lastIPTablesCleanup = time.Now()
} else {
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
proxier.logger.Error(err, "Failed to execute iptables-save: stale chains will not be deleted")
}
}

Expand All @@ -1420,15 +1425,15 @@ func (proxier *Proxier) syncProxyRules() {
} else {
nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
proxier.logger.Error(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
for _, ip := range nodeIPs {
if ip.IsLoopback() {
if isIPv6 {
klog.ErrorS(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
proxier.logger.Error(nil, "--nodeport-addresses includes localhost but localhost NodePorts are not supported on IPv6", "address", ip.String())
continue
} else if !proxier.localhostNodePorts {
klog.ErrorS(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
proxier.logger.Error(nil, "--nodeport-addresses includes localhost but --iptables-localhost-nodeports=false was passed", "address", ip.String())
continue
}
}
Expand Down Expand Up @@ -1491,24 +1496,24 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.WriteString("COMMIT\n")

klog.V(2).InfoS("Reloading service iptables data",
proxier.logger.V(2).Info("Reloading service iptables data",
"numServices", len(proxier.svcPortMap),
"numEndpoints", totalEndpoints,
"numFilterChains", proxier.filterChains.Lines(),
"numFilterRules", proxier.filterRules.Lines(),
"numNATChains", proxier.natChains.Lines(),
"numNATRules", proxier.natRules.Lines(),
)
klog.V(9).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
proxier.logger.V(9).Info("Restoring iptables", "rules", proxier.iptablesData.Bytes())

// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
err := proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
if pErr, ok := err.(utiliptables.ParseError); ok {
lines := utiliptables.ExtractLines(proxier.iptablesData.Bytes(), pErr.Line(), 3)
klog.ErrorS(pErr, "Failed to execute iptables-restore", "rules", lines)
proxier.logger.Error(pErr, "Failed to execute iptables-restore", "rules", lines)
} else {
klog.ErrorS(err, "Failed to execute iptables-restore")
proxier.logger.Error(err, "Failed to execute iptables-restore")
}
metrics.IptablesRestoreFailuresTotal.Inc()
return
Expand All @@ -1520,7 +1525,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
proxier.logger.V(4).Info("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}

Expand All @@ -1535,10 +1540,10 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
proxier.logger.Error(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}

// Finish housekeeping, clear stale conntrack entries for UDP Services
Expand Down

0 comments on commit 4178a58

Please sign in to comment.