Skip to content

Commit

Permalink
Merge pull request #99287 from anfernee/clientip
Browse files Browse the repository at this point in the history
Add HNS Load Balancer Healthchecks for ExternalTrafficPolicy: Local
  • Loading branch information
k8s-ci-robot committed Mar 17, 2022
2 parents ca2cd3b + 78a507b commit 41b29e6
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 32 deletions.
6 changes: 5 additions & 1 deletion cluster/gce/windows/k8s-node-setup.psm1
Expand Up @@ -892,8 +892,12 @@ function Configure-HostNetworkingService {
-Verbose
$created_hns_network = $true
}

# This name of endpoint is referred in pkg/proxy/winkernel/proxier.go as part of
# kube-proxy as well. A health check port for every service that is specified as
# "externalTrafficPolicy: local" will be added on the endpoint.
# PLEASE KEEP THEM CONSISTENT!!!
$endpoint_name = "cbr0"

$vnic_name = "vEthernet (${endpoint_name})"

$hns_endpoint = Get-HnsEndpoint | Where-Object Name -eq $endpoint_name
Expand Down
2 changes: 2 additions & 0 deletions cmd/kube-proxy/app/init_windows.go
Expand Up @@ -41,4 +41,6 @@ func (o *Options) addOSFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.config.Winkernel.SourceVip, "source-vip", o.config.Winkernel.SourceVip, "The IP address of the source VIP for non-DSR.")
fs.StringVar(&o.config.Winkernel.NetworkName, "network-name", o.config.Winkernel.NetworkName, "The name of the cluster network.")
fs.BoolVar(&o.config.Winkernel.EnableDSR, "enable-dsr", o.config.Winkernel.EnableDSR, "If true make kube-proxy apply DSR policies for service VIP")
fs.StringVar(&o.config.Winkernel.RootHnsEndpointName, "root-hnsendpoint-name", "cbr0", "The name of the hns endpoint name for root namespace attached to l2bridge")
fs.BoolVar(&o.config.Winkernel.ForwardHealthCheckVip, "forward-healthcheck-vip", o.config.Winkernel.ForwardHealthCheckVip, "If true forward service VIP for health check port")
}
7 changes: 7 additions & 0 deletions cmd/kube-proxy/app/server_windows.go
Expand Up @@ -24,7 +24,9 @@ package app
import (
"errors"
"fmt"
"net"
goruntime "runtime"
"strconv"

// Enable pprof HTTP handlers.
_ "net/http/pprof"
Expand Down Expand Up @@ -97,8 +99,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
}

var healthzServer healthcheck.ProxierHealthUpdater
var healthzPort int
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
_, port, _ := net.SplitHostPort(config.HealthzBindAddress)
healthzPort, _ = strconv.Atoi(port)
}

var proxier proxy.Provider
Expand All @@ -120,6 +125,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
recorder,
healthzServer,
config.Winkernel,
healthzPort,
)
} else {

Expand All @@ -134,6 +140,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
recorder,
healthzServer,
config.Winkernel,
healthzPort,
)

}
Expand Down
18 changes: 17 additions & 1 deletion pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -42,5 +42,7 @@ showHiddenMetricsForVersion: ""
udpIdleTimeout: 250ms
winkernel:
enableDSR: false
forwardHealthCheckVip: false
networkName: ""
rootHnsEndpointName: ""
sourceVip: ""
Expand Up @@ -42,5 +42,7 @@ showHiddenMetricsForVersion: ""
udpIdleTimeout: 250ms
winkernel:
enableDSR: false
forwardHealthCheckVip: false
networkName: ""
rootHnsEndpointName: ""
sourceVip: ""
6 changes: 6 additions & 0 deletions pkg/proxy/apis/config/types.go
Expand Up @@ -99,6 +99,12 @@ type KubeProxyWinkernelConfiguration struct {
// enableDSR tells kube-proxy whether HNS policies should be created
// with DSR
EnableDSR bool
// RootHnsEndpointName is the name of hnsendpoint that is attached to
// l2bridge for root network namespace
RootHnsEndpointName string
// ForwardHealthCheckVip forwards service VIP for health check port on
// Windows
ForwardHealthCheckVip bool
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 4 additions & 0 deletions pkg/proxy/apis/config/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions pkg/proxy/winkernel/hnsV1.go
Expand Up @@ -33,6 +33,7 @@ type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
getEndpointByName(id string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
Expand Down Expand Up @@ -62,8 +63,9 @@ func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
isLocal: !hnsendpoint.IsRemoteEndpoint, //TODO: Change isLocal to isRemote
ip: hnsendpoint.IPAddress.String(),
//TODO: Change isLocal to isRemote
isLocal: !hnsendpoint.IsRemoteEndpoint,
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
Expand Down Expand Up @@ -108,6 +110,23 @@ func (hns hnsV1) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}

func (hns hnsV1) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcsshim.GetHNSEndpointByName(name)
if err != nil {
klog.ErrorS(err, "failed to get HNS endpoint by name", "name", name)
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
//TODO: Change isLocal to isRemote
isLocal: !hnsendpoint.IsRemoteEndpoint,
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}

func (hns hnsV1) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/proxy/winkernel/hnsV2.go
Expand Up @@ -114,6 +114,19 @@ func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcn.GetEndpointByName(name)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}
func (hns hnsV2) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/proxy/winkernel/hns_test.go
Expand Up @@ -26,6 +26,8 @@ import (

"strings"
"testing"

"github.com/google/go-cmp/cmp"
)

const (
Expand Down Expand Up @@ -56,12 +58,12 @@ func TestGetEndpointByID(t *testing.T) {
testGetEndpointByID(t, hnsV1)
testGetEndpointByID(t, hnsV2)
}
func TestGetEndpointByIpAddress(t *testing.T) {
func TestGetEndpointByIpAddressAndName(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}

testGetEndpointByIpAddress(t, hnsV1)
testGetEndpointByIpAddress(t, hnsV2)
testGetEndpointByIpAddressAndName(t, hnsV1)
testGetEndpointByIpAddressAndName(t, hnsV2)
}
func TestCreateEndpointLocal(t *testing.T) {
hnsV1 := hnsV1{}
Expand Down Expand Up @@ -165,7 +167,7 @@ func testGetEndpointByID(t *testing.T, hns HostNetworkService) {
t.Error(err)
}
}
func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
func testGetEndpointByIpAddressAndName(t *testing.T, hns HostNetworkService) {
Network := mustTestNetwork(t)

ipConfig := &hcn.IpConfig{
Expand Down Expand Up @@ -195,6 +197,15 @@ func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}

endpoint, err = hns.getEndpointByName(Endpoint.Name)
if err != nil {
t.Error(err)
}
diff := cmp.Diff(endpoint, Endpoint)
if diff != "" {
t.Errorf("getEndpointByName(%s) returned a different endpoint. Diff: %s ", Endpoint.Name, diff)
}

err = Endpoint.Delete()
if err != nil {
t.Error(err)
Expand Down
86 changes: 64 additions & 22 deletions pkg/proxy/winkernel/proxier.go
Expand Up @@ -87,8 +87,9 @@ type externalIPInfo struct {
}

type loadBalancerIngressInfo struct {
ip string
hnsID string
ip string
hnsID string
healthCheckHnsID string
}

type loadBalancerInfo struct {
Expand Down Expand Up @@ -548,6 +549,10 @@ type Proxier struct {
hostMac string
isDSR bool
supportedFeatures hcn.SupportedFeatures
healthzPort int

forwardHealthCheckVip bool
rootHnsEndpointName string
}

type localPort struct {
Expand Down Expand Up @@ -593,6 +598,7 @@ func NewProxier(
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (*Proxier, error) {
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
Expand Down Expand Up @@ -684,24 +690,27 @@ func NewProxier(

isIPv6 := netutils.IsIPv6(nodeIP)
proxier := &Proxier{
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
healthzPort: healthzPort,
rootHnsEndpointName: config.RootHnsEndpointName,
forwardHealthCheckVip: config.ForwardHealthCheckVip,
}

ipFamily := v1.IPv4Protocol
Expand Down Expand Up @@ -730,18 +739,19 @@ func NewDualStackProxier(
recorder events.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
healthzPort int,
) (proxy.Provider, error) {

// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[0], recorder, healthzServer, config)
clusterCIDR, hostname, nodeIP[0], recorder, healthzServer, config, healthzPort)

if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[0])
}

ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[1], recorder, healthzServer, config)
clusterCIDR, hostname, nodeIP[1], recorder, healthzServer, config, healthzPort)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[1])
}
Expand Down Expand Up @@ -796,6 +806,10 @@ func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
hns.deleteLoadBalancer(lbIngressIP.hnsID)
lbIngressIP.hnsID = ""
if lbIngressIP.healthCheckHnsID != "" {
hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID)
lbIngressIP.healthCheckHnsID = ""
}
}
}

Expand Down Expand Up @@ -988,6 +1002,11 @@ func (proxier *Proxier) syncProxyRules() {
hnsNetworkName := proxier.network.name
hns := proxier.hns

var gatewayHnsendpoint *endpointsInfo
if proxier.forwardHealthCheckVip {
gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
}

prevNetworkID := proxier.network.id
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
Expand Down Expand Up @@ -1319,7 +1338,30 @@ func (proxier *Proxier) syncProxyRules() {
} else {
klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)

if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil {
nodeport := proxier.healthzPort
if svcInfo.HealthCheckNodePort() != 0 {
nodeport = svcInfo.HealthCheckNodePort()
}
hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointsInfo{*gatewayHnsendpoint},
loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(nodeport),
uint16(nodeport),
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
}
}
svcInfo.policyApplied = true
klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
Expand Down

0 comments on commit 41b29e6

Please sign in to comment.