Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HNS Load Balancer Health Checks for ExternalTrafficPolicy: Local #96998

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/proxy/winkernel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
"//conditions:default": [],
Expand Down
22 changes: 20 additions & 2 deletions pkg/proxy/winkernel/hnsV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ package winkernel
import (
"encoding/json"
"fmt"
"github.com/Microsoft/hcsshim"
"k8s.io/klog/v2"
"net"
"strings"

"github.com/Microsoft/hcsshim"
"k8s.io/klog/v2"
)

type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
getEndpointByName(id string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
Expand Down Expand Up @@ -106,6 +108,22 @@ func (hns hnsV1) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}

func (hns hnsV1) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcsshim.GetHNSEndpointByName(name)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
isLocal: !hnsendpoint.IsRemoteEndpoint, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}

func (hns hnsV1) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/proxy/winkernel/hnsV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpoin

return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV2) getEndpointByName(name string) (*endpointsInfo, error) {
hnsendpoint, err := hcn.GetEndpointByName(name)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}
func (hns hnsV2) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/proxy/winkernel/hns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"strings"
"testing"

"github.com/google/go-cmp/cmp"
)

const (
Expand Down Expand Up @@ -55,12 +57,12 @@ func TestGetEndpointByID(t *testing.T) {
testGetEndpointByID(t, hnsV1)
testGetEndpointByID(t, hnsV2)
}
func TestGetEndpointByIpAddress(t *testing.T) {
func TestGetEndpointByIpAddressAndName(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}

testGetEndpointByIpAddress(t, hnsV1)
testGetEndpointByIpAddress(t, hnsV2)
testGetEndpointByIpAddressAndName(t, hnsV1)
testGetEndpointByIpAddressAndName(t, hnsV2)
}
func TestCreateEndpointLocal(t *testing.T) {
hnsV1 := hnsV1{}
Expand Down Expand Up @@ -164,7 +166,7 @@ func testGetEndpointByID(t *testing.T, hns HostNetworkService) {
t.Error(err)
}
}
func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
func testGetEndpointByIpAddressAndName(t *testing.T, hns HostNetworkService) {
Network := mustTestNetwork(t)

ipConfig := &hcn.IpConfig{
Expand Down Expand Up @@ -194,6 +196,15 @@ func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}

endpoint, err = hns.getEndpointByName(Endpoint.Name)
if err != nil {
t.Error(err)
}
diff := cmp.Diff(endpoint, Endpoint)
if diff != "" {
t.Errorf("getEndpointByName(%s) returned a different endpoint. Diff: %s ", Endpoint.Name, diff)
}

err = Endpoint.Delete()
if err != nil {
t.Error(err)
Expand Down
24 changes: 22 additions & 2 deletions pkg/proxy/winkernel/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ type externalIPInfo struct {
}

type loadBalancerIngressInfo struct {
ip string
hnsID string
ip string
hnsID string
healthCheckHnsID string
}

type loadBalancerInfo struct {
Expand Down Expand Up @@ -728,6 +729,8 @@ func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
hns.deleteLoadBalancer(lbIngressIP.hnsID)
lbIngressIP.hnsID = ""
hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID)
lbIngressIP.healthCheckHnsID = ""
}
}

Expand Down Expand Up @@ -954,6 +957,7 @@ func (proxier *Proxier) syncProxyRules() {

hnsNetworkName := proxier.network.name
hns := proxier.hns
cbr0HnsEndpoint, _ := hns.getEndpointByName("cbr0")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also looks like this can collide because there could be multiple endpoints with the same name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I don't think we should be putting GCP specific logic here. where is the current cbr0 endpoint being created?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not usually apply policies to endpoints other than the one that is part of the service.

What is the actual scenario/data path are you trying to enable with this fix?
Can you please describe? That would help us figure out what is the correct solution here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is trying to let the GCE load balancer health check the nodes. The way GCE load balancer works is it doesn't do DNAT. Instead of getting <node-ip>:<health-check-node-port>, the node gets <service-vip>:<health-check-node-port>. so we need to do this to forward the health check package the kube-proxy's internal health check app.

I have another PR open for further review: #99287. Let's move discussion there.


prevNetworkID := proxier.network.id
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
Expand Down Expand Up @@ -1270,6 +1274,22 @@ func (proxier *Proxier) syncProxyRules() {
}
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)

hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
[]endpointsInfo{*cbr0HnsEndpoint},
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
sourceVip,
lbIngressIP.ip,
Enum(svcInfo.Protocol()),
uint16(svcInfo.HealthCheckNodePort()),
uint16(svcInfo.HealthCheckNodePort()),
)
if err != nil {
klog.Errorf("Policy creation failed: %v", err)
continue
}
lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
klog.V(3).Infof("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIP)
}
svcInfo.policyApplied = true
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
Expand Down
21 changes: 15 additions & 6 deletions pkg/proxy/winkernel/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ package winkernel

import (
"fmt"
"k8s.io/api/core/v1"
"net"
"strings"
"testing"
"time"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilpointer "k8s.io/utils/pointer"
"net"
"strings"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -70,6 +71,10 @@ func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, nil
}

func (hns fakeHNS) getEndpointByName(id string) (*endpointsInfo, error) {
return nil, nil
}

func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
_, ipNet, _ := net.ParseCIDR(destinationPrefix)

Expand Down Expand Up @@ -703,7 +708,6 @@ func TestCreateLoadBalancer(t *testing.T) {
t.Errorf("%v does not match %v", svcInfo.hnsID, guid)
}
}

}

func TestCreateDsrLoadBalancer(t *testing.T) {
Expand Down Expand Up @@ -765,6 +769,11 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
if svcInfo.localTrafficDSR != true {
t.Errorf("Failed to create DSR loadbalancer with local traffic policy")
}
if len(svcInfo.loadBalancerIngressIPs) == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test condition fails. Looking at the structs it's not clear what should actually be compared here or what the expectation is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fake needs to be changed a lit.

t.Errorf("svcInfo does not have any loadBalancerIngressIPs, %+v", svcInfo)
} else if svcInfo.loadBalancerIngressIPs[0].healthCheckHnsID != guid {
t.Errorf("The Hns Loadbalancer HealthCheck Id %v does not match %v. ServicePortName %q", svcInfo.loadBalancerIngressIPs[0].healthCheckHnsID, guid, svcPortName.String())
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions test/e2e/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,9 +894,10 @@ var _ = SIGDescribe("Services", func() {
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{})
})

ginkgo.It("should preserve source pod IP for traffic thru service cluster IP [LinuxOnly]", func() {
ginkgo.It("should preserve source pod IP for traffic thru service cluster IP", func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not pass locally. I'm not sure why yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test is related. It's creating clusterIP service and check if source ip is preserved. This PR's case is from LoadBalancer.

// TODO(jeremyje): Determine which parts of this test work.
// this test is creating a pod with HostNetwork=true, which is not supported on Windows.
e2eskipper.SkipIfNodeOSDistroIs("windows")
//e2eskipper.SkipIfNodeOSDistroIs("windows")

// This behavior is not supported if Kube-proxy is in "userspace" mode.
// So we check the kube-proxy mode and skip this test if that's the case.
Expand Down