Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kube-proxy consider endpoint readiness to delete UDP stale conntrack entries #106163

Merged
merged 1 commit into from Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 30 additions & 3 deletions pkg/proxy/endpoints.go
Expand Up @@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {

// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
return info.String() == other.String() &&
aojea marked this conversation as resolved.
Show resolved Hide resolved
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}

// GetNodeName returns the NodeName for this endpoint.
Expand Down Expand Up @@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
// is used to store stale udp service in order to clear udp conntrack later.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
// and then goes unready or changes its IP address.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries
// since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
stale := true
// Check if the endpoint has changed, including if it went from ready to not ready.
// If it did change stale entries for the old endpoint has to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false
Expand All @@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
}
}

// Detect stale services
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
// There may exist a conntrack entry that could blackhole traffic to the service.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
epReady := 0
for _, ep := range epList {
if ep.IsReady() {
epReady++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... this doesn't take ProxyTerminatingEndpoints into account...

Consider the case where an externalTrafficPolicy: Local service has a single Serving-Terminating endpoint. Connections come in to that endpoint's node and are accepted and processed by the terminating pod. Then a new endpoint starts up and becomes Ready. Given the code here, that would be interpreted as "the service went from 0 endpoints to non-0 endpoints", and so the node with the Serving-Terminating endpoint would flush all conntrack entries for the service, breaking the existing connections to the Serving-Terminating pod.

(Also, this patch changes the rules for staleServices, but there are terminating endpoints problems with staleEndpoints too; we used to delete conntrack entries to endpoints as soon as the endpoint become non-ready, but now we don't delete them until the pod is fully deleted...)

Copy link
Member Author

@aojea aojea Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... this doesn't take ProxyTerminatingEndpoints into account...

this is a regression that needs to be backported and ProxyTerminatingEndpoints is an alpha feature (no backports for alpha are allowed). Also, after the analysis you did in your related PR, I don't think that is easy to solve both problems at the same time 😅

breaking the existing connections to the Serving-Terminating pod.

it is UDP, in the sense that is not breaking the connection per se, since it is connectionless, the new packet will create a new entry looking at the iptables rules, that should still exist ... is less performant because you process the packet through the iptables list again but not a big deal (at least I can't see how this can break something, UDP is unreliable)

(Also, this patch changes the rules for staleServices, but there are terminating endpoints problems with staleEndpoints too; we used to delete conntrack entries to endpoints as soon as the endpoint become non-ready, but now we don't delete them until the pod is fully deleted...)

that is fixed by the Equal change to take into consideration Ready https://github.com/kubernetes/kubernetes/pull/106163/files#r743336980

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is UDP, in the sense that is not breaking the connection per se, since it is connectionless

UDP is connectionless at L4, but not necessarily at L7. That's the main reason UDP conntrack records exist. Eg, anything using Datagram TLS (like QUIC / HTTP/3) won't survive being switched to a different endpoint mid-communication, because the new endpoint won't have the encryption key it needs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 I can't argue against that, but seems we are going to have some fun soon 😄

}
}

oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}

if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/iptables/proxier.go
Expand Up @@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal &&
e.protocol == o.protocol &&
e.chainName == o.chainName
e.chainName == o.chainName &&
e.Ready == o.Ready
}

// Returns the endpoint chain name for a given endpointsInfo.
Expand Down
26 changes: 26 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Expand Up @@ -3898,6 +3898,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &udpProtocol,
}}
}),
)

fp.syncProxyRules()

if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}

populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Expand Down
73 changes: 73 additions & 0 deletions test/e2e/network/conntrack.go
Expand Up @@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
}
})

// Regression test for #105657
// 1. Create an UDP Service
// 2. Client Pod sending traffic to the UDP service
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
// that the Endpoint conditions Ready is false.
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
// subsequent traffic.
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {

// Create a ClusterIP service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)

// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)

// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)

// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
// Add an init container to hold the pod to be ready for 15 seconds
serverPod1.Spec.InitContainers = []v1.Container{
{
Name: "init",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
},
}
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
fr.PodClient().CreateSync(serverPod1)

// wait until the endpoints are ready
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})

// Note that the fact that Endpoints object already exists, does NOT mean
// that iptables (or whatever else is used) was already programmed.
// Additionally take into account that UDP conntract entries timeout is
// 30 seconds by default.
// Based on the above check if the pod receives the traffic.
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
framework.Failf("Failed to connect to backend pod")
}

})

// Regression test for #74839, where:
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes
// a problem where spurious retransmits in a long-running TCP connection to a service
Expand Down