diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 945237dd1cee..dea19375fe59 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) { // Equal is part of proxy.Endpoint interface. func (info *BaseEndpointInfo) Equal(other Endpoint) bool { - return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() + return info.String() == other.String() && + info.GetIsLocal() == other.GetIsLocal() && + info.IsReady() == other.IsReady() } // GetNodeName returns the NodeName for this endpoint. @@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets. // detectStaleConnections modifies and with detected stale connections. // is used to store stale udp service in order to clear udp conntrack later. func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { + // Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic + // and then goes unready or changes its IP address. for svcPortName, epList := range oldEndpointsMap { if svcPortName.Protocol != v1.ProtocolUDP { continue } for _, ep := range epList { + // if the old endpoint wasn't ready is not possible to have stale entries + // since there was no traffic sent to it. + if !ep.IsReady() { + continue + } stale := true + // Check if the endpoint has changed, including if it went from ready to not ready. + // If it did change stale entries for the old endpoint has to be cleared. for i := range newEndpointsMap[svcPortName] { if newEndpointsMap[svcPortName][i].Equal(ep) { stale = false @@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale } } + // Detect stale services + // For udp service, if its backend changes from 0 to non-0 ready endpoints. + // There may exist a conntrack entry that could blackhole traffic to the service. for svcPortName, epList := range newEndpointsMap { if svcPortName.Protocol != v1.ProtocolUDP { continue } - // For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service. - if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 { + epReady := 0 + for _, ep := range epList { + if ep.IsReady() { + epReady++ + } + } + + oldEpReady := 0 + for _, ep := range oldEndpointsMap[svcPortName] { + if ep.IsReady() { + oldEpReady++ + } + } + + if epReady > 0 && oldEpReady == 0 { *staleServiceNames = append(*staleServiceNames, svcPortName) } } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 15323340c8ec..1cc563652c43 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { return e.Endpoint == o.Endpoint && e.IsLocal == o.IsLocal && e.protocol == o.protocol && - e.chainName == o.chainName + e.chainName == o.chainName && + e.Ready == o.Ready } // Returns the endpoint chain name for a given endpointsInfo. diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 66eb9bcf92d3..4bfaef22827f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -3898,6 +3898,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) { eps.AddressType = discovery.AddressTypeIPv4 eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.Bool(false), + }, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: utilpointer.StringPtr(svcPortName.Port), + Port: utilpointer.Int32(int32(svcPort)), + Protocol: &udpProtocol, + }} + }), + ) + + fp.syncProxyRules() + + if fexec.CommandCalls != 0 { + t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries") + } + + populateEndpointSlices(fp, + makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { + eps.AddressType = discovery.AddressTypeIPv4 + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{epIP}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.Bool(true), + }, }} eps.Ports = []discovery.EndpointPort{{ Name: utilpointer.StringPtr(svcPortName.Port), diff --git a/test/e2e/network/conntrack.go b/test/e2e/network/conntrack.go index 3fd25bbf8c0c..ee45c50a5e63 100644 --- a/test/e2e/network/conntrack.go +++ b/test/e2e/network/conntrack.go @@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() { } }) + // Regression test for #105657 + // 1. Create an UDP Service + // 2. Client Pod sending traffic to the UDP service + // 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time + // The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just + // that the Endpoint conditions Ready is false. + // If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server + // when the endpoint slice has been created, however, the iptables rules will not installed until at least one + // endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and + // installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing + // subsequent traffic. + ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() { + + // Create a ClusterIP service + udpJig := e2eservice.NewTestJig(cs, ns, serviceName) + ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns) + udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeClusterIP + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)}, + } + }) + framework.ExpectNoError(err) + + // Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds + ginkgo.By("creating a client pod for probing the service " + serviceName) + clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name} + e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = podClient + fr.PodClient().CreateSync(clientPod) + + // Read the client pod logs + logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + // Add a backend pod to the service in the other node + ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName) + serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80)) + serverPod1.Labels = udpJig.Labels + nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name} + // Add an init container to hold the pod to be ready for 15 seconds + serverPod1.Spec.InitContainers = []v1.Container{ + { + Name: "init", + Image: imageutils.GetE2EImage(imageutils.BusyBox), + Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"}, + }, + } + e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection) + fr.PodClient().CreateSync(serverPod1) + + // wait until the endpoints are ready + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}}) + + // Note that the fact that Endpoints object already exists, does NOT mean + // that iptables (or whatever else is used) was already programmed. + // Additionally take into account that UDP conntract entries timeout is + // 30 seconds by default. + // Based on the above check if the pod receives the traffic. + ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP) + if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil { + logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + framework.Failf("Failed to connect to backend pod") + } + + }) + // Regression test for #74839, where: // Packets considered INVALID by conntrack are now dropped. In particular, this fixes // a problem where spurious retransmits in a long-running TCP connection to a service