Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step 2 – controlplane/apiserver: move peer proxy code to allow generic aggregator construction #124576

Merged
merged 1 commit into from Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/config.go
Expand Up @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.Extra.PeerProxy, pluginInitializer)
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -265,14 +265,14 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.Extra.PeerEndpointLeaseReconciler, err = controlplane.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
config.ControlPlane.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.Extra.PeerProxy, err = controlplane.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
config.ControlPlane.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ControlPlane.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
if err != nil {
return nil, nil, nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controlplane/apiserver/completion.go
Expand Up @@ -40,5 +40,9 @@ func (c *Config) Complete() CompletedConfig {
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.Generic.ExternalAddress}
cfg.Generic.DiscoveryAddresses = discoveryAddresses

if cfg.Extra.PeerEndpointReconcileInterval == 0 {
cfg.Extra.PeerEndpointReconcileInterval = DefaultPeerEndpointReconcileInterval
}

return CompletedConfig{&cfg}
}
14 changes: 14 additions & 0 deletions pkg/controlplane/apiserver/config.go
Expand Up @@ -31,12 +31,14 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/version"
Expand Down Expand Up @@ -67,6 +69,18 @@ type Extra struct {
EnableLogsSupport bool
ProxyTransport *http.Transport

// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointReconcileInterval defines how often the endpoint leases are reconciled in etcd.
PeerEndpointReconcileInterval time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used, if anywhere? it seems net new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's defaulted in the generic code, and overridden with the service endpoint reconciler interval (= old behaviour) for kube-apiserver.

// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountMaxExpiration time.Duration
ExtendExpiration bool
Expand Down
89 changes: 89 additions & 0 deletions pkg/controlplane/apiserver/peer.go
@@ -0,0 +1,89 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
)

const (
// DefaultPeerEndpointReconcileInterval is the default amount of time for how often
// the peer endpoint leases are reconciled.
DefaultPeerEndpointReconcileInterval = 10 * time.Second
// DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint
// leases on the storage layer
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
)

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this leak into generic or be an option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option in follow-up (step 3).

}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultPeerEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
78 changes: 11 additions & 67 deletions pkg/controlplane/instance.go
Expand Up @@ -54,7 +54,6 @@ import (
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -67,14 +66,10 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/client-go/transport"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -157,16 +152,6 @@ type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
KubeletClientConfig kubeletclient.KubeletClientConfig

// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

// Values to build the IP addresses used by discovery
// The range of IPs to be assigned to services with type=ClusterIP or greater
ServiceIPRange net.IPNet
Expand Down Expand Up @@ -290,6 +275,12 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {

// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
if c.ControlPlane.PeerEndpointReconcileInterval == 0 && c.EndpointReconcilerConfig.Interval != 0 {
// default this to the endpoint reconciler value before the generic
// controlplane completion can kick in
c.ControlPlane.PeerEndpointReconcileInterval = c.EndpointReconcilerConfig.Interval
}

cfg := completedConfig{
c.ControlPlane.Complete(),
&c.Extra,
Expand Down Expand Up @@ -508,11 +499,11 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.Extra.PeerEndpointLeaseReconciler,
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
Expand All @@ -529,9 +520,9 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.Extra.PeerProxy != nil {
if c.ControlPlane.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
Expand Down Expand Up @@ -579,7 +570,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())

peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
Expand Down Expand Up @@ -782,53 +773,6 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler peerreconcilers.PeerEndpointLeaseReconciler, serializer kruntime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}

// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this kubeapi-specific code or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic. It will be moved in #120202.

Expand Down