Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync_proxy_rules_no_endpoints_total metric #108930

Merged
merged 5 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 27 additions & 10 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -38,9 +38,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
Expand Down Expand Up @@ -993,6 +995,11 @@ func (proxier *Proxier) syncProxyRules() {
}
}

// These two variables are used to publish the sync_proxy_rules_no_endpoints_total
// metric.
serviceNoLocalEndpointsTotalInternal := 0
serviceNoLocalEndpointsTotalExternal := 0

// Build rules for each service-port.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
Expand Down Expand Up @@ -1347,16 +1354,24 @@ func (proxier *Proxier) syncProxyRules() {
if len(localEndpoints) != 0 {
// Write rules jumping from localPolicyChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, localPolicyChain, localEndpoints, args)
} else if hasEndpoints {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(localPolicyChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
} else {
if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
serviceNoLocalEndpointsTotalInternal++
}
if svcInfo.ExternalPolicyLocal() {
serviceNoLocalEndpointsTotalExternal++
}
if hasEndpoints {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(localPolicyChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
}
}
}
}
Expand Down Expand Up @@ -1478,6 +1493,8 @@ func (proxier *Proxier) syncProxyRules() {
}
}

metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(serviceNoLocalEndpointsTotalInternal))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(serviceNoLocalEndpointsTotalExternal))
if proxier.healthzServer != nil {
proxier.healthzServer.Updated()
}
Expand Down
154 changes: 154 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Expand Up @@ -6050,3 +6050,157 @@ func TestEndpointCommentElision(t *testing.T) {
t.Errorf("numComments (%d) != 0 when numEndpoints (%d) > threshold (%d)", numComments, numEndpoints, endpointChainsNumberThreshold)
}
}

func TestNoEndpointsMetric(t *testing.T) {
type endpoint struct {
ip string
hostname string
}

internalTrafficPolicyLocal := v1.ServiceInternalTrafficPolicyLocal
externalTrafficPolicyLocal := v1.ServiceExternalTrafficPolicyTypeLocal

metrics.RegisterMetrics()
testCases := []struct {
name string
internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType
externalTrafficPolicy v1.ServiceExternalTrafficPolicyType
endpoints []endpoint
expectedSyncProxyRulesNoLocalEndpointsTotalInternal int
expectedSyncProxyRulesNoLocalEndpointsTotalExternal int
}{
{
name: "internalTrafficPolicy is set and there is non-zero local endpoints",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "externalTrafficPolicy is set and there is non-zero local endpoints",
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "both policies are set and there is non-zero local endpoints",
internalTrafficPolicy: &internalTrafficPolicyLocal,
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", testHostname},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
},
{
name: "internalTrafficPolicy is set and there is zero local endpoint",
internalTrafficPolicy: &internalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
},
{
name: "externalTrafficPolicy is set and there is zero local endpoint",
externalTrafficPolicy: externalTrafficPolicyLocal,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be useful to add tests where we set both the internal and external traffic policy, or collapse the existing tests into two tests where both policies are set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test

endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
},
{
name: "both policies are set and there is zero local endpoint",
internalTrafficPolicy: &internalTrafficPolicyLocal,
externalTrafficPolicy: externalTrafficPolicyLocal,
endpoints: []endpoint{
{"10.0.1.1", "host0"},
{"10.0.1.2", "host1"},
{"10.0.1.3", "host2"},
},
expectedSyncProxyRulesNoLocalEndpointsTotalInternal: 1,
expectedSyncProxyRulesNoLocalEndpointsTotalExternal: 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to have some test where the metric is more than 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this as a quick follow-up? It involves refactoring the test to include dynamically generating services. Since we're after the freeze, I'd like to get this one in

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK

},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, true)()
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointSlicesSynced()

serviceName := "svc1"
namespaceName := "ns1"

svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.30.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", Port: 80, Protocol: v1.ProtocolTCP, NodePort: 123}},
},
}
if tc.internalTrafficPolicy != nil {
svc.Spec.InternalTrafficPolicy = tc.internalTrafficPolicy
}
if tc.externalTrafficPolicy != "" {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = tc.externalTrafficPolicy
}

fp.OnServiceAdd(svc)

tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
}
for _, ep := range tc.endpoints {
endpointSlice.Endpoints = append(endpointSlice.Endpoints, discovery.Endpoint{
Addresses: []string{ep.ip},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
NodeName: utilpointer.StringPtr(ep.hostname),
})
}

fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
syncProxyRulesNoLocalEndpointsTotalInternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal"))
if err != nil {
t.Errorf("failed to get %s value, err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}

if tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal != int(syncProxyRulesNoLocalEndpointsTotalInternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalInternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalInternal)
}

syncProxyRulesNoLocalEndpointsTotalExternal, err := testutil.GetGaugeMetricValue(metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external"))
if err != nil {
t.Errorf("failed to get %s value(external), err: %v", metrics.SyncProxyRulesNoLocalEndpointsTotal.Name, err)
}

if tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal != int(syncProxyRulesNoLocalEndpointsTotalExternal) {
t.Errorf("sync_proxy_rules_no_endpoints_total metric mismatch(internal): got=%d, expected %d", int(syncProxyRulesNoLocalEndpointsTotalExternal), tc.expectedSyncProxyRulesNoLocalEndpointsTotalExternal)
}
})
}
}
27 changes: 27 additions & 0 deletions pkg/proxy/ipvs/proxier.go
Expand Up @@ -273,6 +273,20 @@ type Proxier struct {
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
gracefuldeleteManager *GracefulTerminationManager
// serviceNoLocalEndpointsInternal represents the set of services that couldn't be applied
// due to the absence of local endpoints when the internal traffic policy is "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "internal".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say when the traffic policy label set to "Local".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this change?

// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service
// if it has multiple ports but each Service should only be counted once.
serviceNoLocalEndpointsInternal sets.String
// serviceNoLocalEndpointsExternal irepresents the set of services that couldn't be applied
// due to the absence of any endpoints when the external traffic policy is "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "external".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say when the traffic policy is "Cluster"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this change?

// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service
// if it has multiple ports but each Service should only be counted once.
serviceNoLocalEndpointsExternal sets.String
}

// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface
Expand Down Expand Up @@ -1027,6 +1041,8 @@ func (proxier *Proxier) syncProxyRules() {

klog.V(3).InfoS("Syncing ipvs proxier rules")

proxier.serviceNoLocalEndpointsInternal = sets.NewString()
proxier.serviceNoLocalEndpointsExternal = sets.NewString()
// Begin install iptables

// Reset all buffers used later.
Expand Down Expand Up @@ -1599,6 +1615,9 @@ func (proxier *Proxier) syncProxyRules() {
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)

metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
}

// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
Expand Down Expand Up @@ -1980,6 +1999,14 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// Traffic from an external source will be routed but the reply
// will have the POD address and will be discarded.
endpoints = clusterEndpoints

if svcInfo.InternalPolicyLocal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
proxier.serviceNoLocalEndpointsInternal.Insert(svcPortName.NamespacedName.String())
}

if svcInfo.ExternalPolicyLocal() {
proxier.serviceNoLocalEndpointsExternal.Insert(svcPortName.NamespacedName.String())
}
}
} else {
endpoints = clusterEndpoints
Expand Down