Skip to content

Commit

Permalink
pod-scaler: add labels to Build Pods via admission
Browse files Browse the repository at this point in the history
We need labels on Pods to associate individual executions with their
semantic category. Today, we label Builds (as that's what we create) but
the Build subsystem does not carry those labels forward to the Pods that
implement the Build. This patch adds a new mutating admission webhook
server that will add the labels where necessary.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Apr 21, 2021
1 parent cf9a81d commit b313c28
Show file tree
Hide file tree
Showing 17 changed files with 869 additions and 7 deletions.
211 changes: 211 additions & 0 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"
admissionv1 "k8s.io/api/admission/v1"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
prowConfig "k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/flagutil"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/metrics"
"k8s.io/test-infra/prow/simplifypath"

buildv1 "github.com/openshift/api/build/v1"
buildclientv1 "github.com/openshift/client-go/build/clientset/versioned/typed/build/v1"

"github.com/openshift/ci-tools/pkg/steps"
)

var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)

func init() {
addToScheme(scheme)
}

func addToScheme(scheme *runtime.Scheme) {
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(admissionv1.AddToScheme(scheme))
utilruntime.Must(admissionregistrationv1.AddToScheme(scheme))
}

// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}

// admitFunc is the type we use for all of our validators and mutators
type admitFunc func(admissionv1.AdmissionReview) *admissionv1.AdmissionResponse

// serve handles the http portion of a request prior to handing to an admit
// function
func serve(admit admitFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}

// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
logrus.Errorf("contentType=%s, expect application/json", contentType)
return
}

// The AdmissionReview that was sent to the webhook
requestedAdmissionReview := admissionv1.AdmissionReview{}

// The AdmissionReview that will be returned
responseAdmissionReview := admissionv1.AdmissionReview{}

deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &requestedAdmissionReview); err != nil {
logrus.WithError(err).Error("could not decode admission review")
responseAdmissionReview.Response = toAdmissionResponse(err)
} else {
// pass to admitFunc
responseAdmissionReview.Response = admit(requestedAdmissionReview)
}

// Return the same UID
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID

respBytes, err := json.Marshal(responseAdmissionReview)
if err != nil {
logrus.Error(err)
}
if _, err := w.Write(respBytes); err != nil {
logrus.Error(err)
}
}
}

// l keeps the tree legible
func l(fragment string, children ...simplifypath.Node) simplifypath.Node {
return simplifypath.L(fragment, children...)
}

var (
admissionMetrics = metrics.NewMetrics("pod_scaler_admission")
)

func admission(port int, client buildclientv1.BuildV1Interface) {
logger := logrus.WithField("component", "admission")
server := &admissionServer{
logger: logger,
client: client,
}

metrics.ExposeMetrics("pod_scaler_admission", prowConfig.PushGateway{}, flagutil.DefaultMetricsPort)
simplifier := simplifypath.NewSimplifier(l("", // shadow element mimicing the root
l("pods"),
))
handler := metrics.TraceHandler(simplifier, admissionMetrics.HTTPRequestDuration, admissionMetrics.HTTPResponseSize)
mux := http.NewServeMux()
mux.HandleFunc("/pods", handler(serve(mutatePods(server))).ServeHTTP)
httpServer := &http.Server{Addr: ":" + strconv.Itoa(port), Handler: mux}
interrupts.ListenAndServe(httpServer, 5*time.Second)
logger.Debug("Ready to serve HTTP requests.")
}

type admissionServer struct {
logger *logrus.Entry
client buildclientv1.BuildV1Interface
}

func mutatePods(server *admissionServer) admitFunc {
return func(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
logger := server.logger.WithField("admission_uid", ar.Request.UID)
podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
if ar.Request.Resource != podResource {
logger.Errorf("expect resource to be %s", podResource)
return toAdmissionResponse(fmt.Errorf("got incorrect resource %q, wanted %q", ar.Request.Resource, podResource))
}

raw := ar.Request.Object.Raw
pod := corev1.Pod{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
logger.Error(err)
return toAdmissionResponse(err)
}
logger = logger.WithFields(logrus.Fields{
"namespace": pod.Namespace,
"name": pod.Name,
})
reviewResponse := admissionv1.AdmissionResponse{}
reviewResponse.Allowed = true
mutated := pod.DeepCopy()
addMissingLabels(server.client, mutated, server.logger)
patch, err := patchFor(&pod, mutated)
if err != nil {
logger.WithError(err).Warn("Failed to determine JSON patch for mutation.")
return &reviewResponse
}
if string(patch) == `{}` {
return &reviewResponse
}
reviewResponse.Patch = patch
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
return &reviewResponse
}
}

func addMissingLabels(client buildclientv1.BuildV1Interface, pod *corev1.Pod, logger *logrus.Entry) {
buildName, isBuildPod := pod.Labels[buildv1.BuildLabel]
if !isBuildPod {
return
}
logger = logger.WithField("build", buildName)
logger.Debug("Handling labels on Pod created for a Build.")
build, err := client.Builds(pod.Namespace).Get(interrupts.Context(), buildName, metav1.GetOptions{})
if err != nil {
logger.WithError(err).Error("Could not get Build for Pod.")
return
}
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
for _, label := range []string{steps.LabelMetadataOrg, steps.LabelMetadataRepo, steps.LabelMetadataBranch, steps.LabelMetadataVariant, steps.LabelMetadataTarget, steps.LabelMetadataStep} {
buildValue, buildHas := build.Labels[label]
_, podHas := pod.Labels[label]
if buildHas && !podHas {
pod.Labels[label] = buildValue
}
}
return
}

func patchFor(old, new *corev1.Pod) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return []byte{}, err
}
newData, err := json.Marshal(new)
if err != nil {
return []byte{}, err
}
return jsonpatch.CreateMergePatch(oldData, newData)
}
114 changes: 114 additions & 0 deletions cmd/pod-scaler/admission_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

buildv1 "github.com/openshift/api/build/v1"
"github.com/openshift/ci-tools/pkg/testhelper"
fakebuildv1client "github.com/openshift/client-go/build/clientset/versioned/fake"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestMutatePods(t *testing.T) {
client := fakebuildv1client.NewSimpleClientset(
&buildv1.Build{
TypeMeta: metav1.TypeMeta{
Kind: "Build",
APIVersion: "build.openshift.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "withoutlabels",
Labels: map[string]string{},
},
},
&buildv1.Build{
TypeMeta: metav1.TypeMeta{
Kind: "Build",
APIVersion: "build.openshift.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "withlabels",
Labels: map[string]string{
"ci.openshift.io/metadata.org": "org",
"ci.openshift.io/metadata.repo": "repo",
"ci.openshift.io/metadata.branch": "branch",
"ci.openshift.io/metadata.variant": "variant",
"ci.openshift.io/metadata.target": "target",
"ci.openshift.io/metadata.step": "step",
},
},
},
)

a := &admissionServer{
logger: logrus.WithField("test", t.Name()),
client: client.BuildV1(),
}
server := httptest.NewServer(serve(mutatePods(a)))
defer server.Close()

var testCases = []struct {
name string
request string
response string
}{
{
name: "pod not associated with a build",
request: `{
"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"name": "somethingelse","namespace": "namespace"}}
}
}`,
},
{
name: "pod associated with a build that has no labels",
request: `{"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"labels": {"openshift.io/build.name": "withoutlabels"}, "name": "withoutlabels-build","namespace": "namespace"}}
}
}`,
},
{
name: "pod associated with a build with labels",
request: `{"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"labels": {"openshift.io/build.name": "withlabels"}, "name": "withoutlabels-build","namespace": "namespace"}}
}
}`,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
httpClient := &http.Client{}
req, err := http.NewRequest(http.MethodPut, server.URL+"/pods", strings.NewReader(testCase.request))
if err != nil {
t.Fatalf("%s: could not create request: %v", testCase.name, err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
t.Fatalf("%s: got unexpected error from server: %v", testCase.name, err)
}
response, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("%s: got unexpected error reading response: %v", testCase.name, err)
}
testhelper.CompareWithFixture(t, response)
})
}
}
31 changes: 24 additions & 7 deletions cmd/pod-scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"gopkg.in/fsnotify.v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/test-infra/prow/interrupts"

buildclientset "github.com/openshift/client-go/build/clientset/versioned/typed/build/v1"
routeclientset "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1"

"github.com/openshift/ci-tools/pkg/util"
Expand Down Expand Up @@ -47,7 +49,7 @@ func bindOptions(fs *flag.FlagSet) *options {
o := options{producerOptions: producerOptions{}}
fs.StringVar(&o.mode, "mode", "", "Which mode to run in.")
fs.StringVar(&o.kubeconfig, "kubeconfig", "", "Path to a ~/.kube/config to use for querying Prometheuses. Each context will be considered a cluster to query.")
fs.IntVar(&o.port, "port", 0, "Port to serve requirements on.")
fs.IntVar(&o.port, "port", 0, "Port to serve admission webhooks on.")
fs.IntVar(&o.uiPort, "ui-port", 0, "Port to serve frontend on.")
fs.StringVar(&o.loglevel, "loglevel", "debug", "Logging level.")
fs.StringVar(&o.cacheDir, "cache-dir", "", "Local directory holding cache data (for development mode).")
Expand All @@ -63,15 +65,16 @@ func (o *options) validate() error {
if o.kubeconfig == "" && !kubeconfigSet {
return errors.New("--kubeconfig or $KUBECONFIG is required")
}
case "consumer":
if o.port == 0 {
return errors.New("--port is required")
}
case "consumer.ui":
if o.uiPort == 0 {
return errors.New("--ui-port is required")
}
case "consumer.admission":
if o.port == 0 {
return errors.New("--port is required")
}
default:
return errors.New("--mode must be either \"producer\" or \"consumer\"")
return errors.New("--mode must be either \"producer\", \"consumer.ui\", or \"consumer.admission\"")
}
if o.cacheDir == "" {
if o.cacheBucket == "" {
Expand Down Expand Up @@ -111,8 +114,10 @@ func main() {
switch opts.mode {
case "producer":
mainProduce(opts, cache)
case "consumer":
case "consumer.ui":
// TODO
case "consumer.admission":
mainAdmission(opts)
}
interrupts.WaitForGracefulShutdown()
}
Expand Down Expand Up @@ -151,3 +156,15 @@ func mainProduce(opts *options, cache cache) {

go produce(clients, cache)
}

func mainAdmission(opts *options) {
restConfig, err := rest.InClusterConfig()
if err != nil {
logrus.WithError(err).Fatal("Failed to load in-cluster config.")
}
client, err := buildclientset.NewForConfig(restConfig)
if err != nil {
logrus.WithError(err).Fatal("Failed to construct client.")
}
go admission(opts.port, client)
}

0 comments on commit b313c28

Please sign in to comment.