Skip to content

Commit

Permalink
Merge pull request #96600 from maplain/internal-traffic-policy
Browse files Browse the repository at this point in the history
Service Internal Traffic Policy
  • Loading branch information
k8s-ci-robot committed Mar 8, 2021
2 parents 71764b1 + 62804c1 commit 2783f2f
Show file tree
Hide file tree
Showing 30 changed files with 1,853 additions and 947 deletions.
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/api/v1/service/util.go
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"strings"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
utilnet "k8s.io/utils/net"
)

Expand Down Expand Up @@ -76,6 +76,15 @@ func RequestsOnlyLocalTraffic(service *v1.Service) bool {
return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
}

// RequestsOnlyLocalTrafficForInternal checks if service prefers Node Local
// endpoints for internal traffic
func RequestsOnlyLocalTrafficForInternal(service *v1.Service) bool {
if service.Spec.InternalTrafficPolicy == nil {
return false
}
return *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
}

// NeedsHealthCheck checks if service needs health check.
func NeedsHealthCheck(service *v1.Service) bool {
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
Expand Down
29 changes: 28 additions & 1 deletion pkg/api/v1/service/util_test.go
Expand Up @@ -20,7 +20,7 @@ import (
"strings"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
utilnet "k8s.io/utils/net"
)

Expand Down Expand Up @@ -214,3 +214,30 @@ func TestNeedsHealthCheck(t *testing.T) {
},
})
}

func TestRequestsOnlyLocalTrafficForInternal(t *testing.T) {
checkRequestsOnlyLocalTrafficForInternal := func(expected bool, service *v1.Service) {
res := RequestsOnlyLocalTrafficForInternal(service)
if res != expected {
t.Errorf("Expected internal local traffic = %v, got %v",
expected, res)
}
}

// default InternalTrafficPolicy is nil
checkRequestsOnlyLocalTrafficForInternal(false, &v1.Service{})

local := v1.ServiceInternalTrafficPolicyLocal
checkRequestsOnlyLocalTrafficForInternal(true, &v1.Service{
Spec: v1.ServiceSpec{
InternalTrafficPolicy: &local,
},
})

cluster := v1.ServiceInternalTrafficPolicyCluster
checkRequestsOnlyLocalTrafficForInternal(false, &v1.Service{
Spec: v1.ServiceSpec{
InternalTrafficPolicy: &cluster,
},
})
}
25 changes: 24 additions & 1 deletion pkg/apis/core/types.go
Expand Up @@ -3475,6 +3475,19 @@ const (
ServiceTypeExternalName ServiceType = "ExternalName"
)

// ServiceInternalTrafficPolicyType describes the type of traffic routing for
// internal traffic
type ServiceInternalTrafficPolicyType string

const (
// ServiceInternalTrafficPolicyCluster routes traffic to all endpoints
ServiceInternalTrafficPolicyCluster ServiceInternalTrafficPolicyType = "Cluster"

// ServiceInternalTrafficPolicyLocal only routes to node-local
// endpoints, otherwise drops the traffic
ServiceInternalTrafficPolicyLocal ServiceInternalTrafficPolicyType = "Local"
)

// ServiceExternalTrafficPolicyType string
type ServiceExternalTrafficPolicyType string

Expand Down Expand Up @@ -3739,9 +3752,19 @@ type ServiceSpec struct {
// implementation (e.g. cloud providers) should ignore Services that set this field.
// This field can only be set when creating or updating a Service to type 'LoadBalancer'.
// Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.
// featureGate=LoadBalancerClass
// +featureGate=LoadBalancerClass
// +optional
LoadBalancerClass *string

// InternalTrafficPolicy specifies if the cluster internal traffic
// should be routed to all endpoints or node-local endpoints only.
// "Cluster" routes internal traffic to a Service to all endpoints.
// "Local" routes traffic to node-local endpoints only, traffic is
// dropped if no node-local endpoints are ready.
// The default value is "Cluster".
// +featureGate=ServiceInternalTrafficPolicy
// +optional
InternalTrafficPolicy *ServiceInternalTrafficPolicyType
}

// ServicePort represents the port on which the service is exposed
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/core/v1/defaults.go
Expand Up @@ -131,6 +131,11 @@ func SetDefaults_Service(obj *v1.Service) {
obj.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
}

if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && obj.Spec.InternalTrafficPolicy == nil {
serviceInternalTrafficPolicyCluster := v1.ServiceInternalTrafficPolicyCluster
obj.Spec.InternalTrafficPolicy = &serviceInternalTrafficPolicyCluster
}

if utilfeature.DefaultFeatureGate.Enabled(features.ServiceLBNodePortControl) {
if obj.Spec.Type == v1.ServiceTypeLoadBalancer {
if obj.Spec.AllocateLoadBalancerNodePorts == nil {
Expand Down
64 changes: 64 additions & 0 deletions pkg/apis/core/v1/defaults_test.go
Expand Up @@ -29,8 +29,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"

// ensure types are installed
Expand Down Expand Up @@ -1798,3 +1801,64 @@ func TestSetDefaultEnableServiceLinks(t *testing.T) {
t.Errorf("Expected enableServiceLinks value: %+v\ngot: %+v\n", v1.DefaultEnableServiceLinks, *output.Spec.EnableServiceLinks)
}
}

func TestSetDefaultServiceInternalTrafficPolicy(t *testing.T) {
cluster := v1.ServiceInternalTrafficPolicyCluster
local := v1.ServiceInternalTrafficPolicyLocal
testCases := []struct {
name string
expectedInternalTrafficPolicy v1.ServiceInternalTrafficPolicyType
svc v1.Service
featureGateOn bool
}{
{
name: "must set default internalTrafficPolicy",
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
svc: v1.Service{},
featureGateOn: true,
},
{
name: "must not set default internalTrafficPolicy when it's cluster",
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyCluster,
svc: v1.Service{
Spec: v1.ServiceSpec{
InternalTrafficPolicy: &cluster,
},
},
featureGateOn: true,
},
{
name: "must not set default internalTrafficPolicy when it's local",
expectedInternalTrafficPolicy: v1.ServiceInternalTrafficPolicyLocal,
svc: v1.Service{
Spec: v1.ServiceSpec{
InternalTrafficPolicy: &local,
},
},
featureGateOn: true,
},
{
name: "must not set default internalTrafficPolicy when gate is disabled",
expectedInternalTrafficPolicy: "",
svc: v1.Service{},
featureGateOn: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, test.featureGateOn)()
obj := roundTrip(t, runtime.Object(&test.svc))
svc := obj.(*v1.Service)

if test.expectedInternalTrafficPolicy == "" {
if svc.Spec.InternalTrafficPolicy != nil {
t.Fatalf("expected .spec.internalTrafficPolicy: null, got %v", *svc.Spec.InternalTrafficPolicy)
}
} else {
if *svc.Spec.InternalTrafficPolicy != test.expectedInternalTrafficPolicy {
t.Fatalf("expected .spec.internalTrafficPolicy: %v got %v", test.expectedInternalTrafficPolicy, *svc.Spec.InternalTrafficPolicy)
}
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/apis/core/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions pkg/apis/core/validation/validation.go
Expand Up @@ -4171,6 +4171,8 @@ var supportedSessionAffinityType = sets.NewString(string(core.ServiceAffinityCli
var supportedServiceType = sets.NewString(string(core.ServiceTypeClusterIP), string(core.ServiceTypeNodePort),
string(core.ServiceTypeLoadBalancer), string(core.ServiceTypeExternalName))

var supportedServiceInternalTrafficPolicy = sets.NewString(string(core.ServiceInternalTrafficPolicyCluster), string(core.ServiceExternalTrafficPolicyTypeLocal))

var supportedServiceIPFamily = sets.NewString(string(core.IPv4Protocol), string(core.IPv6Protocol))
var supportedServiceIPFamilyPolicy = sets.NewString(string(core.IPFamilyPolicySingleStack), string(core.IPFamilyPolicyPreferDualStack), string(core.IPFamilyPolicyRequireDualStack))

Expand Down Expand Up @@ -4378,6 +4380,10 @@ func ValidateService(service *core.Service) field.ErrorList {

// external traffic fields
allErrs = append(allErrs, validateServiceExternalTrafficFieldsValue(service)...)

// internal traffic policy field
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)

return allErrs
}

Expand Down Expand Up @@ -4446,6 +4452,24 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
return allErrs
}

// validateServiceInternalTrafficFieldsValue validates InternalTraffic related
// spec have legal value.
func validateServiceInternalTrafficFieldsValue(service *core.Service) field.ErrorList {
allErrs := field.ErrorList{}

if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) {
if service.Spec.InternalTrafficPolicy == nil {
allErrs = append(allErrs, field.Required(field.NewPath("spec").Child("internalTrafficPolicy"), ""))
}
}

if service.Spec.InternalTrafficPolicy != nil && !supportedServiceInternalTrafficPolicy.Has(string(*service.Spec.InternalTrafficPolicy)) {
allErrs = append(allErrs, field.NotSupported(field.NewPath("spec").Child("internalTrafficPolicy"), *service.Spec.InternalTrafficPolicy, supportedServiceInternalTrafficPolicy.List()))
}

return allErrs
}

// ValidateServiceExternalTrafficFieldsCombination validates if ExternalTrafficPolicy,
// HealthCheckNodePort and Type combination are legal. For update, it should be called
// after clearing externalTraffic related fields for the ease of transitioning between
Expand Down
27 changes: 24 additions & 3 deletions pkg/apis/core/validation/validation_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/capabilities"
Expand Down Expand Up @@ -10135,9 +10136,10 @@ func TestValidateServiceCreate(t *testing.T) {
preferDualStack := core.IPFamilyPolicyPreferDualStack

testCases := []struct {
name string
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
numErrs int
name string
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
numErrs int
featureGates []featuregate.Feature
}{
{
name: "missing namespace",
Expand Down Expand Up @@ -10750,6 +10752,22 @@ func TestValidateServiceCreate(t *testing.T) {
},
numErrs: 1,
},
{
name: "nil internalTraffic field when feature gate is on",
tweakSvc: func(s *core.Service) {
s.Spec.InternalTrafficPolicy = nil
},
featureGates: []featuregate.Feature{features.ServiceInternalTrafficPolicy},
numErrs: 1,
},
{
name: "invalid internalTraffic field",
tweakSvc: func(s *core.Service) {
invalid := core.ServiceInternalTrafficPolicyType("invalid")
s.Spec.InternalTrafficPolicy = &invalid
},
numErrs: 1,
},
{
name: "nagative healthCheckNodePort field",
tweakSvc: func(s *core.Service) {
Expand Down Expand Up @@ -11323,6 +11341,9 @@ func TestValidateServiceCreate(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
for i := range tc.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, tc.featureGates[i], true)()
}
svc := makeValidService()
tc.tweakSvc(&svc)
errs := ValidateServiceCreate(&svc)
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/core/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -709,6 +709,12 @@ const (
//
// Enable Scope and Namespace fields on IngressClassParametersReference.
IngressClassNamespacedParams featuregate.Feature = "IngressClassNamespacedParams"

// owner: @maplain @andrewsykim
// alpha: v1.21
//
// Enables node-local routing for Service internal traffic
ServiceInternalTrafficPolicy featuregate.Feature = "ServiceInternalTrafficPolicy"
)

func init() {
Expand Down Expand Up @@ -816,6 +822,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ServiceLoadBalancerClass: {Default: false, PreRelease: featuregate.Alpha},
LogarithmicScaleDown: {Default: false, PreRelease: featuregate.Alpha},
IngressClassNamespacedParams: {Default: false, PreRelease: featuregate.Alpha},
ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha},

// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:
Expand Down

0 comments on commit 2783f2f

Please sign in to comment.