Skip to content

Commit

Permalink
Configurable utilization metrics scraping (#2056)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdavidsmith committed Jan 27, 2023
1 parent 6141171 commit 93f4bb9
Show file tree
Hide file tree
Showing 18 changed files with 816 additions and 92 deletions.
8 changes: 8 additions & 0 deletions deployment/executor/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ rules:
- create
- delete
- deletecollection
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- "networking.k8s.io"
resources:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ require (
github.com/magefile/mage v1.14.0
github.com/matryer/moq v0.3.0
github.com/openconfig/goyang v1.2.0
github.com/prometheus/common v0.37.0
github.com/sanity-io/litter v1.5.5
)

Expand Down Expand Up @@ -293,7 +294,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pquerna/cachecontrol v0.1.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
Expand Down
18 changes: 12 additions & 6 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"fmt"
"net/http"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -131,11 +132,15 @@ func StartUpWithContext(
)

nodeInfoService := node.NewKubernetesNodeInfoService(clusterContext, config.Kubernetes.ToleratedTaints)
queueUtilisationService := utilisation.NewMetricsServerQueueUtilisationService(
clusterContext, nodeInfoService)
podUtilisationService := utilisation.NewPodUtilisationService(
clusterContext,
nodeInfoService,
config.Metric.CustomUsageMetrics,
&http.Client{Timeout: 15 * time.Second},
)
clusterUtilisationService := utilisation.NewClusterUtilisationService(
clusterContext,
queueUtilisationService,
podUtilisationService,
nodeInfoService,
usageClient,
config.Kubernetes.TrackedNodeLabels,
Expand Down Expand Up @@ -180,18 +185,19 @@ func StartUpWithContext(

resourceCleanupService := service.NewResourceCleanupService(clusterContext, config.Kubernetes)

pod_metrics.ExposeClusterContextMetrics(clusterContext, clusterUtilisationService, queueUtilisationService, nodeInfoService)
pod_metrics.ExposeClusterContextMetrics(clusterContext, clusterUtilisationService, podUtilisationService, nodeInfoService)

taskManager.Register(eventReporter.ReportMissingJobEvents, config.Task.MissingJobEventReconciliationInterval, "event_reconciliation")
taskManager.Register(clusterAllocationService.AllocateSpareClusterCapacity, config.Task.AllocateSpareClusterCapacityInterval, "job_lease_request")
taskManager.Register(resourceCleanupService.CleanupResources, config.Task.ResourceCleanupInterval, "resource_cleanup")

if config.Metric.ExposeQueueUsageMetrics {
taskManager.Register(queueUtilisationService.RefreshUtilisationData, config.Task.QueueUsageDataRefreshInterval, "pod_usage_data_refresh")
taskManager.Register(podUtilisationService.RefreshUtilisationData, config.Task.QueueUsageDataRefreshInterval, "pod_usage_data_refresh")

if config.Task.UtilisationEventReportingInterval > 0 {
podUtilisationReporter := utilisation.NewUtilisationEventReporter(
clusterContext,
queueUtilisationService,
podUtilisationService,
eventReporter,
config.Task.UtilisationEventReportingInterval)
taskManager.Register(
Expand Down
23 changes: 23 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,31 @@ type TaskConfiguration struct {
type MetricConfiguration struct {
Port uint16
ExposeQueueUsageMetrics bool
CustomUsageMetrics []CustomUsageMetrics
}

type CustomUsageMetrics struct {
Namespace string
EndpointSelectorLabelName string
EndpointSelectorLabelValue string
Metrics []CustomUsageMetric
}

type CustomUsageMetric struct {
Name string
PrometheusMetricName string
PrometheusPodNameLabel string
AggregateType AggregateType
Multiplier float64
}

type AggregateType string

const (
Sum AggregateType = "Sum"
Mean = "Mean"
)

type ExecutorConfiguration struct {
HttpPort uint16
Metric MetricConfiguration
Expand Down
21 changes: 21 additions & 0 deletions internal/executor/context/cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
informer "k8s.io/client-go/informers/core/v1"
discovery_informer "k8s.io/client-go/informers/discovery/v1"
network_informer "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -54,6 +56,7 @@ type ClusterContext interface {
GetPodEvents(pod *v1.Pod) ([]*v1.Event, error)
GetServices(pod *v1.Pod) ([]*v1.Service, error)
GetIngresses(pod *v1.Pod) ([]*networking.Ingress, error)
GetEndpointSlices(namespace string, labelName string, labelValue string) ([]*discovery.EndpointSlice, error)

SubmitPod(pod *v1.Pod, owner string, ownerGroups []string) (*v1.Pod, error)
SubmitService(service *v1.Service) (*v1.Service, error)
Expand All @@ -79,6 +82,7 @@ type KubernetesClusterContext struct {
nodeInformer informer.NodeInformer
serviceInformer informer.ServiceInformer
ingressInformer network_informer.IngressInformer
endpointSliceInformer discovery_informer.EndpointSliceInformer
stopper chan struct{}
kubernetesClient kubernetes.Interface
kubernetesClientProvider cluster.KubernetesClientProvider
Expand Down Expand Up @@ -120,6 +124,7 @@ func NewClusterContext(
eventInformer: factory.Core().V1().Events(),
serviceInformer: factory.Core().V1().Services(),
ingressInformer: factory.Networking().V1().Ingresses(),
endpointSliceInformer: factory.Discovery().V1().EndpointSlices(),
kubernetesClient: kubernetesClient,
kubernetesClientProvider: kubernetesClientProvider,
etcdHealthMonitor: etcdHealthMonitor,
Expand All @@ -142,6 +147,7 @@ func NewClusterContext(
context.nodeInformer.Lister()
context.serviceInformer.Lister()
context.ingressInformer.Lister()
context.endpointSliceInformer.Lister()

err := context.eventInformer.Informer().AddIndexers(cache.Indexers{podByUIDIndex: indexPodByUID})
if err != nil {
Expand Down Expand Up @@ -490,6 +496,21 @@ func (c *KubernetesClusterContext) GetIngresses(pod *v1.Pod) ([]*networking.Ingr
return ingresses, err
}

func (c *KubernetesClusterContext) GetEndpointSlices(namespace string, labelName string, labelValue string) ([]*discovery.EndpointSlice, error) {
req, err := labels.NewRequirement(labelName, selection.Equals, []string{labelValue})
if err != nil {
return nil, err
}
selector := labels.NewSelector().Add(*req)

endpointSlices, err := c.endpointSliceInformer.Lister().EndpointSlices(namespace).List(selector)
if err != nil {
return nil, fmt.Errorf("cannot get endpointslices with label #{labelName}=#{labelValue} in namespace #{namespace}: #{err}")
}

return endpointSlices, nil
}

func createPodAssociationSelector(pod *v1.Pod) (*labels.Selector, error) {
jobId, jobIdPresent := pod.Labels[domain.JobId]
queue, queuePresent := pod.Labels[domain.Queue]
Expand Down
5 changes: 5 additions & 0 deletions internal/executor/context/fake/sync_cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubelet/pkg/apis/stats/v1alpha1"
Expand Down Expand Up @@ -81,6 +82,10 @@ func (c *SyncFakeClusterContext) GetIngresses(pod *v1.Pod) ([]*networking.Ingres
return nil, fmt.Errorf("Ingresses not implemented in SyncFakeClusterContext")
}

func (c *SyncFakeClusterContext) GetEndpointSlices(namespace string, labelName string, labelValue string) ([]*discovery.EndpointSlice, error) {
return nil, fmt.Errorf("EndpointSlices not implemented in SyncFakeClusterContext")
}

func (c *SyncFakeClusterContext) DeleteIngress(ingress *networking.Ingress) error {
return fmt.Errorf("Ingresses not implemented in SyncFakeClusterContext")
}
Expand Down
6 changes: 6 additions & 0 deletions internal/executor/fake/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package context

import (
"context"
"fmt"
"math/rand"
"regexp"
"sort"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -190,6 +192,10 @@ func (c *FakeClusterContext) GetIngresses(pod *v1.Pod) ([]*networking.Ingress, e
return nil, errors.Errorf("Ingresses not implemented in FakeClusterContext")
}

func (c *FakeClusterContext) GetEndpointSlices(namespace string, labelName string, labelValue string) ([]*discovery.EndpointSlice, error) {
return nil, fmt.Errorf("EndpointSlices not implemented in SyncFakeClusterContext")
}

func (c *FakeClusterContext) DeleteIngress(ingress *networking.Ingress) error {
return errors.Errorf("Ingresses not implemented in FakeClusterContext")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/executor/utilisation/cluster_utilisation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestCreateReportsOfQueueUsages(t *testing.T) {
utilisationService := &ClusterUtilisationService{
queueUtilisationService: NewMetricsServerQueueUtilisationService(nil, nil),
queueUtilisationService: NewPodUtilisationService(nil, nil, nil, nil),
}

var priority int32
Expand All @@ -39,7 +39,7 @@ func TestCreateReportsOfQueueUsages(t *testing.T) {

func TestCreateReportsOfQueueUsages_WhenAllPending(t *testing.T) {
utilisationService := &ClusterUtilisationService{
queueUtilisationService: NewMetricsServerQueueUtilisationService(nil, nil),
queueUtilisationService: NewPodUtilisationService(nil, nil, nil, nil),
}

var priority int32
Expand Down

0 comments on commit 93f4bb9

Please sign in to comment.