Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #106163: kube-proxy: fix stale detection logic #106239

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 30 additions & 3 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func (info *BaseEndpointInfo) Port() (int, error) {

// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}

// GetNodeName returns the NodeName for this endpoint.
Expand Down Expand Up @@ -536,13 +538,22 @@ func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
// is used to store stale udp service in order to clear udp conntrack later.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
// and then goes unready or changes its IP address.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries
// since there was no traffic sent to it.
if !ep.IsReady() {
continue
}
stale := true
// Check if the endpoint has changed, including if it went from ready to not ready.
// If it did change stale entries for the old endpoint has to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false
Expand All @@ -556,13 +567,29 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
}
}

// Detect stale services
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
// There may exist a conntrack entry that could blackhole traffic to the service.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}

// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
epReady := 0
for _, ep := range epList {
if ep.IsReady() {
epReady++
}
}

oldEpReady := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() {
oldEpReady++
}
}

if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal &&
e.protocol == o.protocol &&
e.chainName == o.chainName
e.chainName == o.chainName &&
e.Ready == o.Ready
}

// Returns the endpoint chain name for a given endpointsInfo.
Expand Down
26 changes: 26 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3232,6 +3232,32 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &udpProtocol,
}}
}),
)

fp.syncProxyRules()

if fexec.CommandCalls != 0 {
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
}

populateEndpointSlices(fp,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{
Ready: utilpointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Expand Down
73 changes: 73 additions & 0 deletions test/e2e/network/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,79 @@ var _ = common.SIGDescribe("Conntrack", func() {
}
})

// Regression test for #105657
// 1. Create an UDP Service
// 2. Client Pod sending traffic to the UDP service
// 3. Create an UDP server associated to the Service created in 1. with an init container that sleeps for some time
// The init container makes that the server pod is not ready, however, the endpoint slices are created, it is just
// that the Endpoint conditions Ready is false.
// If the kube-proxy conntrack logic doesn't check readiness, it will delete the conntrack entries for the UDP server
// when the endpoint slice has been created, however, the iptables rules will not installed until at least one
// endpoint is ready. If some traffic arrives to since kube-proxy clear the entries (see the endpoint slice) and
// installs the corresponding iptables rules (the endpoint is ready), a conntrack entry will be generated blackholing
// subsequent traffic.
ginkgo.It("should be able to preserve UDP traffic when initial unready endpoints get ready", func() {

// Create a ClusterIP service
udpJig := e2eservice.NewTestJig(cs, ns, serviceName)
ginkgo.By("creating a UDP service " + serviceName + " with type=ClusterIP in " + ns)
udpService, err := udpJig.CreateUDPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "udp", Protocol: v1.ProtocolUDP, TargetPort: intstr.FromInt(80)},
}
})
framework.ExpectNoError(err)

// Create a pod in one node to create the UDP traffic against the ClusterIP service every 5 seconds
ginkgo.By("creating a client pod for probing the service " + serviceName)
clientPod := e2epod.NewAgnhostPod(ns, podClient, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: clientNodeInfo.name}
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do echo "$(date) Try: ${i}"; echo hostname | nc -u -w 5 -p %d %s %d; echo; done`, srcPort, udpService.Spec.ClusterIP, udpService.Spec.Ports[0].Port)
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
clientPod.Spec.Containers[0].Name = podClient
fr.PodClient().CreateSync(clientPod)

// Read the client pod logs
logs, err := e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)

// Add a backend pod to the service in the other node
ginkgo.By("creating a backend pod " + podBackend1 + " for the service " + serviceName)
serverPod1 := e2epod.NewAgnhostPod(ns, podBackend1, nil, nil, nil, "netexec", fmt.Sprintf("--udp-port=%d", 80))
serverPod1.Labels = udpJig.Labels
nodeSelection = e2epod.NodeSelection{Name: serverNodeInfo.name}
// Add an init container to hold the pod to be ready for 15 seconds
serverPod1.Spec.InitContainers = []v1.Container{
{
Name: "init",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh", "-c", "echo Pausing start. && sleep 15"},
},
}
e2epod.SetNodeSelection(&serverPod1.Spec, nodeSelection)
fr.PodClient().CreateSync(serverPod1)

// wait until the endpoints are ready
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{podBackend1: {80}})

// Note that the fact that Endpoints object already exists, does NOT mean
// that iptables (or whatever else is used) was already programmed.
// Additionally take into account that UDP conntract entries timeout is
// 30 seconds by default.
// Based on the above check if the pod receives the traffic.
ginkgo.By("checking client pod connected to the backend on Node IP " + serverNodeInfo.nodeIP)
if err := wait.PollImmediate(5*time.Second, time.Minute, logContainsFn(podBackend1, podClient)); err != nil {
logs, err = e2epod.GetPodLogs(cs, ns, podClient, podClient)
framework.ExpectNoError(err)
framework.Logf("Pod client logs: %s", logs)
framework.Failf("Failed to connect to backend pod")
}

})

// Regression test for #74839, where:
// Packets considered INVALID by conntrack are now dropped. In particular, this fixes
// a problem where spurious retransmits in a long-running TCP connection to a service
Expand Down