Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HNS Load Balancer Healthchecks for ExternalTrafficPolicy: Local #99287

Merged
merged 1 commit into from Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion cluster/gce/windows/k8s-node-setup.psm1
Expand Up @@ -892,8 +892,12 @@ function Configure-HostNetworkingService {
-Verbose
$created_hns_network = $true
}

# This name of endpoint is referred in pkg/proxy/winkernel/proxier.go as part of
# kube-proxy as well. A health check port for every service that is specified as
# "externalTrafficPolicy: local" will be added on the endpoint.
# PLEASE KEEP THEM CONSISTENT!!!
$endpoint_name = "cbr0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this passed in as a configuration to Windows Kube-proxy similar to the healthzPort?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this one. This is not configurable in windows startup scripts. Making it configurable here won't be useful I think.


$vnic_name = "vEthernet (${endpoint_name})"

$hns_endpoint = Get-HnsEndpoint | Where-Object Name -eq $endpoint_name
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-proxy/app/init_windows.go
Expand Up @@ -41,4 +41,6 @@ func (o *Options) addOSFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.config.Winkernel.SourceVip, "source-vip", o.config.Winkernel.SourceVip, "The IP address of the source VIP for non-DSR.")
fs.StringVar(&o.config.Winkernel.NetworkName, "network-name", o.config.Winkernel.NetworkName, "The name of the cluster network.")
fs.BoolVar(&o.config.Winkernel.EnableDSR, "enable-dsr", o.config.Winkernel.EnableDSR, "If true make kube-proxy apply DSR policies for service VIP")
fs.StringVar(&o.config.Winkernel.RootHnsEndpointName, "root-hnsendpoint-name", "cbr0", "The name of the hns endpoint name for root namespace attached to l2bridge")
fs.BoolVar(&o.config.Winkernel.ForwardHealthCheckVip, "forward-healthcheck-vip", o.config.Winkernel.ForwardHealthCheckVip, "If true forward service VIP for health check port")
}
7 changes: 7 additions & 0 deletions cmd/kube-proxy/app/server_windows.go
Expand Up @@ -24,7 +24,9 @@ package app
import (
"errors"
"fmt"
"net"
goruntime "runtime"
"strconv"

// Enable pprof HTTP handlers.
_ "net/http/pprof"
Expand Down Expand Up @@ -97,8 +99,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
}

var healthzServer healthcheck.ProxierHealthUpdater
var healthzPort int
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
_, port, _ := net.SplitHostPort(config.HealthzBindAddress)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

healthzPort, _ = strconv.Atoi(port)
}

var proxier proxy.Provider
Expand All @@ -120,6 +125,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
recorder,
healthzServer,
config.Winkernel,
healthzPort,
)
} else {

Expand All @@ -134,6 +140,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
recorder,
healthzServer,
config.Winkernel,
healthzPort,
)

}
Expand Down
18 changes: 17 additions & 1 deletion pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -42,5 +42,7 @@ showHiddenMetricsForVersion: ""
udpIdleTimeout: 250ms
winkernel:
enableDSR: false
forwardHealthCheckVip: false
networkName: ""
rootHnsEndpointName: ""
sourceVip: ""
Expand Up @@ -42,5 +42,7 @@ showHiddenMetricsForVersion: ""
udpIdleTimeout: 250ms
winkernel:
enableDSR: false
forwardHealthCheckVip: false
networkName: ""
rootHnsEndpointName: ""
sourceVip: ""
6 changes: 6 additions & 0 deletions pkg/proxy/apis/config/types.go
Expand Up @@ -99,6 +99,12 @@ type KubeProxyWinkernelConfiguration struct {
// enableDSR tells kube-proxy whether HNS policies should be created
// with DSR
EnableDSR bool
// RootHnsEndpointName is the name of hnsendpoint that is attached to
// l2bridge for root network namespace
RootHnsEndpointName string
// ForwardHealthCheckVip forwards service VIP for health check port on
// Windows
ForwardHealthCheckVip bool
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 4 additions & 0 deletions pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions pkg/proxy/winkernel/hnsV1.go
Expand Up @@ -33,6 +33,7 @@ type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
getEndpointByName(id string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
Expand Down Expand Up @@ -62,8 +63,9 @@ func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
isLocal: !hnsendpoint.IsRemoteEndpoint, //TODO: Change isLocal to isRemote
ip: hnsendpoint.IPAddress.String(),
//TODO: Change isLocal to isRemote
isLocal: !hnsendpoint.IsRemoteEndpoint,
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
Expand Down Expand Up @@ -108,6 +110,23 @@ func (hns hnsV1) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}

func (hns hnsV1) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcsshim.GetHNSEndpointByName(name)
if err != nil {
klog.ErrorS(err, "failed to get HNS endpoint by name", "name", name)
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
//TODO: Change isLocal to isRemote
isLocal: !hnsendpoint.IsRemoteEndpoint,
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}

func (hns hnsV1) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/proxy/winkernel/hnsV2.go
Expand Up @@ -114,6 +114,19 @@ func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcn.GetEndpointByName(name)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}
func (hns hnsV2) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/proxy/winkernel/hns_test.go
Expand Up @@ -26,6 +26,8 @@ import (

"strings"
"testing"

"github.com/google/go-cmp/cmp"
)

const (
Expand Down Expand Up @@ -56,12 +58,12 @@ func TestGetEndpointByID(t *testing.T) {
testGetEndpointByID(t, hnsV1)
testGetEndpointByID(t, hnsV2)
}
func TestGetEndpointByIpAddress(t *testing.T) {
func TestGetEndpointByIpAddressAndName(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}

testGetEndpointByIpAddress(t, hnsV1)
testGetEndpointByIpAddress(t, hnsV2)
testGetEndpointByIpAddressAndName(t, hnsV1)
testGetEndpointByIpAddressAndName(t, hnsV2)
}
func TestCreateEndpointLocal(t *testing.T) {
hnsV1 := hnsV1{}
Expand Down Expand Up @@ -165,7 +167,7 @@ func testGetEndpointByID(t *testing.T, hns HostNetworkService) {
t.Error(err)
}
}
func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
func testGetEndpointByIpAddressAndName(t *testing.T, hns HostNetworkService) {
Network := mustTestNetwork(t)

ipConfig := &hcn.IpConfig{
Expand Down Expand Up @@ -195,6 +197,15 @@ func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}

endpoint, err = hns.getEndpointByName(Endpoint.Name)
if err != nil {
t.Error(err)
}
diff := cmp.Diff(endpoint, Endpoint)
if diff != "" {
t.Errorf("getEndpointByName(%s) returned a different endpoint. Diff: %s ", Endpoint.Name, diff)
}

err = Endpoint.Delete()
if err != nil {
t.Error(err)
Expand Down
86 changes: 64 additions & 22 deletions pkg/proxy/winkernel/proxier.go
Expand Up @@ -87,8 +87,9 @@ type externalIPInfo struct {
}

type loadBalancerIngressInfo struct {
ip string
hnsID string
ip string
hnsID string
healthCheckHnsID string
}

type loadBalancerInfo struct {
Expand Down Expand Up @@ -548,6 +549,10 @@ type Proxier struct {
hostMac string
isDSR bool
supportedFeatures hcn.SupportedFeatures
healthzPort int

forwardHealthCheckVip bool
rootHnsEndpointName string
}

type localPort struct {
Expand Down Expand Up @@ -593,6 +598,7 @@ func NewProxier(
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (*Proxier, error) {
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
Expand Down Expand Up @@ -684,24 +690,27 @@ func NewProxier(

isIPv6 := netutils.IsIPv6(nodeIP)
proxier := &Proxier{
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
healthzPort: healthzPort,
rootHnsEndpointName: config.RootHnsEndpointName,
forwardHealthCheckVip: config.ForwardHealthCheckVip,
}

ipFamily := v1.IPv4Protocol
Expand Down Expand Up @@ -730,18 +739,19 @@ func NewDualStackProxier(
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (proxy.Provider, error) {

// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[0], recorder, healthzServer, config)
clusterCIDR, hostname, nodeIP[0], recorder, healthzServer, config, healthzPort)

if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[0])
}

ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[1], recorder, healthzServer, config)
clusterCIDR, hostname, nodeIP[1], recorder, healthzServer, config, healthzPort)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[1])
}
Expand Down Expand Up @@ -796,6 +806,10 @@ func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
hns.deleteLoadBalancer(lbIngressIP.hnsID)
lbIngressIP.hnsID = ""
if lbIngressIP.healthCheckHnsID != "" {
hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID)
lbIngressIP.healthCheckHnsID = ""
}
}
}

Expand Down Expand Up @@ -988,6 +1002,11 @@ func (proxier *Proxier) syncProxyRules() {
hnsNetworkName := proxier.network.name
hns := proxier.hns

var gatewayHnsendpoint *endpointsInfo
if proxier.forwardHealthCheckVip {
gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
}

prevNetworkID := proxier.network.id
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
Expand Down Expand Up @@ -1319,7 +1338,30 @@ func (proxier *Proxier) syncProxyRules() {
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)

if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil {
nodeport := proxier.healthzPort
Copy link
Member

@aojea aojea Nov 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this is not correct, this is about the healthz of the kube-proxy, not about if the service has local endpoints ... but to be honest I don't fully understand the problem, what is the TL; DR, of why the NewServiceHealthServer doesn't work in windows? that should fix the problem, no?

	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, []string{} /* windows listen to all node addresses */)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is it's not the node address. The google load balancer works this way:

User --> [clientip:12345 to lb_vip:80] -> Google LB -> [clientip:12345 to lb_vip:80] -> Node -> [clientip:12345 to pod_ip:80] -> Pod 

This works right now in kubeproxy's winkernel mode, because the rules are already in place.

What doesn't work is the health check for normal service:

Google LB -> [lb:12345 to lb_vip:10256] -> Node -> X(dropped)

and for externalTrafficPolicy: local service

Google LB -> [lb:12345 to lb_vip:<per-service-hc-port>] -> Node -> X(dropped)

They are dropped because the destination is not among any node IPs.

This basically adds hns rules to change request to node IP.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but why it uses the kube-proxy healthz port , it should use the healthzcheck nodeport, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the part I'm lost is about when it has to use healthz port, but I'm not familiar with the internals of GCE LBs, maybe we should get a reviewer more familiar with this 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but why it uses the kube-proxy healthz port , it should use the healthzcheck nodeport, no?

proxier.healthzPort is basically 10256 if it's not explicitly configured:

// NewOptions returns initialized Options
func NewOptions() *Options {
return &Options{
config: new(kubeproxyconfig.KubeProxyConfiguration),
healthzPort: ports.ProxyHealthzPort,
metricsPort: ports.ProxyStatusPort,
errCh: make(chan error),
}
}

o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)

ProxyHealthzPort = 10256

this is basically the healthzcheck nodeport. For externalTrafficPolicy: local, the port is different for every service.

the part I'm lost is about when it has to use healthz port, but I'm not familiar with the internals of GCE LBs, maybe we should get a reviewer more familiar with this 😅

I understand it's really not straight-forward.. I'll try to get someone from GKE service team to review as well :) @robscott @bowei can you guys help take a look?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @freehan can you take a look?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just catching up on this thread. I think this makes sense. Although we don't need to do this in other proxier implementations, it makes sense that we'd need to this lb_vip -> node ip translation here to get health checks working. I'm less familiar with how the primary/default health check needs to work here. Regardless of OS, that seems to be handled exclusively at a higher level, with no extra logic added in either iptables or winkernel proxier implementations:

healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)

healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)

I'm not familiar enough with how this is setup to know what's necessary, but is it possible that this works without any changes for the default health check? The original bug as reported by the PR seems to indicate the problem is only with ExternalTrafficPolicy: Local.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at higher level, healthcheck.NewProxierHealthServer() is called to create http server that listens on the specific bind address for health check purpose. Because it listens on 0.0.0.0, on Linux it will accept the connection regardless of destination. Unfortunately for Windows, the additional forward needs to be done to forward VIP to node-ip so the health check server can receive traffic.

if svcInfo.HealthCheckNodePort() != 0 {
nodeport = svcInfo.HealthCheckNodePort()
}
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointsInfo{*gatewayHnsendpoint},
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
}
}
svcInfo.policyApplied = true
klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
Expand Down