Skip to content

Commit

Permalink
pod-scaler: add labels to Build Pods via admission
Browse files Browse the repository at this point in the history
We need labels on Pods to associate individual executions with their
semantic category. Today, we label Builds (as that's what we create) but
the Build subsystem does not carry those labels forward to the Pods that
implement the Build. This patch adds a new mutating admission webhook
server that will add the labels where necessary.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Apr 21, 2021
1 parent cf9a81d commit ea1799e
Show file tree
Hide file tree
Showing 17 changed files with 875 additions and 7 deletions.
211 changes: 211 additions & 0 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"

admissionv1 "k8s.io/api/admission/v1"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
prowConfig "k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/flagutil"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/metrics"
"k8s.io/test-infra/prow/simplifypath"

buildv1 "github.com/openshift/api/build/v1"
buildclientv1 "github.com/openshift/client-go/build/clientset/versioned/typed/build/v1"

"github.com/openshift/ci-tools/pkg/steps"
)

var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)

func init() {
addToScheme(scheme)
}

func addToScheme(scheme *runtime.Scheme) {
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(admissionv1.AddToScheme(scheme))
utilruntime.Must(admissionregistrationv1.AddToScheme(scheme))
}

// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}

// admitFunc is the type we use for all of our validators and mutators
type admitFunc func(admissionv1.AdmissionReview) *admissionv1.AdmissionResponse

// serve handles the http portion of a request prior to handing to an admit
// function
func serve(admit admitFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}

// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
logrus.Errorf("contentType=%s, expect application/json", contentType)
return
}

// The AdmissionReview that was sent to the webhook
requestedAdmissionReview := admissionv1.AdmissionReview{}

// The AdmissionReview that will be returned
responseAdmissionReview := admissionv1.AdmissionReview{}

deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &requestedAdmissionReview); err != nil {
logrus.WithError(err).Error("could not decode admission review")
responseAdmissionReview.Response = toAdmissionResponse(err)
} else {
// pass to admitFunc
responseAdmissionReview.Response = admit(requestedAdmissionReview)
}

// Return the same UID
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID

respBytes, err := json.Marshal(responseAdmissionReview)
if err != nil {
logrus.Error(err)
}
if _, err := w.Write(respBytes); err != nil {
logrus.Error(err)
}
}
}

// l keeps the tree legible
func l(fragment string, children ...simplifypath.Node) simplifypath.Node {
return simplifypath.L(fragment, children...)
}

var (
admissionMetrics = metrics.NewMetrics("pod_scaler_admission")
)

func admission(port int, client buildclientv1.BuildV1Interface) {
logger := logrus.WithField("component", "admission")
server := &admissionServer{
logger: logger,
client: client,
}

metrics.ExposeMetrics("pod_scaler_admission", prowConfig.PushGateway{}, flagutil.DefaultMetricsPort)
simplifier := simplifypath.NewSimplifier(l("", // shadow element mimicing the root
l("pods"),
))
handler := metrics.TraceHandler(simplifier, admissionMetrics.HTTPRequestDuration, admissionMetrics.HTTPResponseSize)
mux := http.NewServeMux()
mux.HandleFunc("/pods", handler(serve(mutatePods(server))).ServeHTTP)
httpServer := &http.Server{Addr: ":" + strconv.Itoa(port), Handler: mux}
interrupts.ListenAndServe(httpServer, 5*time.Second)
logger.Debug("Ready to serve HTTP requests.")
}

type admissionServer struct {
logger *logrus.Entry
client buildclientv1.BuildV1Interface
}

func mutatePods(server *admissionServer) admitFunc {
return func(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
logger := server.logger.WithField("admission_uid", ar.Request.UID)
podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
if ar.Request.Resource != podResource {
logger.Errorf("expect resource to be %s", podResource)
return toAdmissionResponse(fmt.Errorf("got incorrect resource %q, wanted %q", ar.Request.Resource, podResource))
}

raw := ar.Request.Object.Raw
pod := corev1.Pod{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
logger.Error(err)
return toAdmissionResponse(err)
}
logger = logger.WithFields(logrus.Fields{
"namespace": pod.Namespace,
"name": pod.Name,
})
reviewResponse := admissionv1.AdmissionResponse{}
reviewResponse.Allowed = true
mutated := pod.DeepCopy()
addMissingLabels(server.client, mutated, server.logger)
patch, err := patchFor(&pod, mutated)
if err != nil {
logger.WithError(err).Warn("Failed to determine JSON patch for mutation.")
return &reviewResponse
}
if string(patch) == `{}` {
return &reviewResponse
}
reviewResponse.Patch = patch
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
return &reviewResponse
}
}

func addMissingLabels(client buildclientv1.BuildV1Interface, pod *corev1.Pod, logger *logrus.Entry) {
buildName, isBuildPod := pod.Labels[buildv1.BuildLabel]
if !isBuildPod {
return
}
logger = logger.WithField("build", buildName)
logger.Debug("Handling labels on Pod created for a Build.")
build, err := client.Builds(pod.Namespace).Get(interrupts.Context(), buildName, metav1.GetOptions{})
if err != nil {
logger.WithError(err).Error("Could not get Build for Pod.")
return
}
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
for _, label := range []string{steps.LabelMetadataOrg, steps.LabelMetadataRepo, steps.LabelMetadataBranch, steps.LabelMetadataVariant, steps.LabelMetadataTarget, steps.LabelMetadataStep} {
buildValue, buildHas := build.Labels[label]
_, podHas := pod.Labels[label]
if buildHas && !podHas {
pod.Labels[label] = buildValue
}
}
}

func patchFor(old, new *corev1.Pod) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
return []byte{}, err
}
newData, err := json.Marshal(new)
if err != nil {
return []byte{}, err
}
return jsonpatch.CreateMergePatch(oldData, newData)
}
120 changes: 120 additions & 0 deletions cmd/pod-scaler/admission_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/sirupsen/logrus"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

buildv1 "github.com/openshift/api/build/v1"
fakebuildv1client "github.com/openshift/client-go/build/clientset/versioned/fake"

"github.com/openshift/ci-tools/pkg/testhelper"
)

func TestMutatePods(t *testing.T) {
client := fakebuildv1client.NewSimpleClientset(
&buildv1.Build{
TypeMeta: metav1.TypeMeta{
Kind: "Build",
APIVersion: "build.openshift.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "withoutlabels",
Labels: map[string]string{},
},
},
&buildv1.Build{
TypeMeta: metav1.TypeMeta{
Kind: "Build",
APIVersion: "build.openshift.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "withlabels",
Labels: map[string]string{
"ci.openshift.io/metadata.org": "org",
"ci.openshift.io/metadata.repo": "repo",
"ci.openshift.io/metadata.branch": "branch",
"ci.openshift.io/metadata.variant": "variant",
"ci.openshift.io/metadata.target": "target",
"ci.openshift.io/metadata.step": "step",
},
},
},
)

a := &admissionServer{
logger: logrus.WithField("test", t.Name()),
client: client.BuildV1(),
}
server := httptest.NewServer(serve(mutatePods(a)))
defer server.Close()

var testCases = []struct {
name string
request string
response string
}{
{
name: "pod not associated with a build",
request: `{
"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"name": "somethingelse","namespace": "namespace"}}
}
}`,
},
{
name: "pod associated with a build that has no labels",
request: `{"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"labels": {"openshift.io/build.name": "withoutlabels"}, "name": "withoutlabels-build","namespace": "namespace"}}
}
}`,
},
{
name: "pod associated with a build with labels",
request: `{"apiVersion": "admission.k8s.io/v1","kind": "AdmissionReview",
"request": {
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
"resource": {"group":"","version":"v1","resource":"pods"},
"object": {"apiVersion": "v1","kind": "Pod","metadata": {"labels": {"openshift.io/build.name": "withlabels"}, "name": "withoutlabels-build","namespace": "namespace"}}
}
}`,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
httpClient := &http.Client{}
req, err := http.NewRequest(http.MethodPut, server.URL+"/pods", strings.NewReader(testCase.request))
if err != nil {
t.Fatalf("%s: could not create request: %v", testCase.name, err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req)
if err != nil {
t.Fatalf("%s: got unexpected error from server: %v", testCase.name, err)
}
response, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("%s: got unexpected error reading response: %v", testCase.name, err)
}
if err := resp.Body.Close(); err != nil {
t.Fatalf("%s: got could not close response body: %v", testCase.name, err)
}
testhelper.CompareWithFixture(t, response)
})
}
}

0 comments on commit ea1799e

Please sign in to comment.