Skip to content

Commit

Permalink
refactor(lifecycle-operator): eventing and telemetry (#1844)
Browse files Browse the repository at this point in the history
  • Loading branch information
thisthat committed Aug 7, 2023
1 parent fc976eb commit 0130576
Show file tree
Hide file tree
Showing 34 changed files with 146 additions and 111 deletions.
11 changes: 6 additions & 5 deletions lifecycle-operator/controllers/common/evaluationhandler.go
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-logr/logr"
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
controllererrors "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/errors"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/lifecycle/interfaces"
"go.opentelemetry.io/otel/codes"
Expand All @@ -21,11 +22,11 @@ import (

type EvaluationHandler struct {
client.Client
EventSender EventSender
EventSender IEvent
Log logr.Logger
Tracer trace.Tracer
Scheme *runtime.Scheme
SpanHandler ISpanHandler
SpanHandler telemetry.ISpanHandler
}

type CreateEvaluationAttributes struct {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (r EvaluationHandler) ReconcileEvaluations(ctx context.Context, phaseCtx co
evaluationExists := false

if oldstatus != evaluationStatus.Status {
r.EventSender.SendK8sEvent(apicommon.PhaseReconcileEvaluation, "Normal", reconcileObject, apicommon.PhaseStateStatusChanged, fmt.Sprintf("evaluation status changed from %s to %s", oldstatus, evaluationStatus.Status), piWrapper.GetVersion())
r.EventSender.Emit(apicommon.PhaseReconcileEvaluation, "Normal", reconcileObject, apicommon.PhaseStateStatusChanged, fmt.Sprintf("evaluation status changed from %s to %s", oldstatus, evaluationStatus.Status), piWrapper.GetVersion())
}

// Check if evaluation has already succeeded or failed
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r EvaluationHandler) CreateKeptnEvaluation(ctx context.Context, namespace
err = r.Client.Create(ctx, &newEvaluation)
if err != nil {
r.Log.Error(err, "could not create KeptnEvaluation")
r.EventSender.SendK8sEvent(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "could not create KeptnEvaluation", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "could not create KeptnEvaluation", piWrapper.GetVersion())
return "", err
}

Expand All @@ -142,7 +143,7 @@ func (r EvaluationHandler) emitEvaluationFailureEvents(evaluation *klcv1alpha3.K
k8sEventMessage = fmt.Sprintf("%s\n%s", k8sEventMessage, msg)
}
}
r.EventSender.SendK8sEvent(apicommon.PhaseReconcileEvaluation, "Warning", evaluation, apicommon.PhaseStateFailed, k8sEventMessage, piWrapper.GetVersion())
r.EventSender.Emit(apicommon.PhaseReconcileEvaluation, "Warning", evaluation, apicommon.PhaseStateFailed, k8sEventMessage, piWrapper.GetVersion())
}

func (r EvaluationHandler) setupEvaluations(evaluationCreateAttributes CreateEvaluationAttributes, piWrapper *interfaces.PhaseItemWrapper) ([]string, []klcv1alpha3.ItemStatus) {
Expand Down
30 changes: 23 additions & 7 deletions lifecycle-operator/controllers/common/eventsender.go
Expand Up @@ -8,19 +8,35 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

type EventSender struct {
//go:generate moq -pkg fake -skip-ensure -out ./fake/event_mock.go . IEvent:MockEvent
type IEvent interface {
Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string)
}

// ===== Main =====

func NewEventSender(recorder record.EventRecorder) IEvent {
return newK8sSender(recorder)
}

// ===== Cloud Event Sender =====
// TODO: implement Cloud Event logic

// ===== K8s Event Sender =====

type k8sEvent struct {
recorder record.EventRecorder
}

func NewEventSender(recorder record.EventRecorder) EventSender {
return EventSender{
func newK8sSender(recorder record.EventRecorder) IEvent {
return &k8sEvent{
recorder: recorder,
}
}

// SendK8sEvent creates k8s Event and adds it to Eventqueue
func (s *EventSender) SendK8sEvent(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, shortReason string, longReason string, version string) {
msg := setEventMessage(phase, reconcileObject, longReason, version)
// SendEvent creates k8s Event and adds it to Eventqueue
func (s *k8sEvent) Emit(phase apicommon.KeptnPhaseType, eventType string, reconcileObject client.Object, status string, message string, version string) {
msg := setEventMessage(phase, reconcileObject, message, version)
annotations := setAnnotations(reconcileObject, phase)
s.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, shortReason), msg)
s.recorder.AnnotatedEventf(reconcileObject, annotations, eventType, fmt.Sprintf("%s%s", phase.ShortName, status), msg)
}
4 changes: 2 additions & 2 deletions lifecycle-operator/controllers/common/eventsender_test.go
Expand Up @@ -13,9 +13,9 @@ import (

func TestEventSender_SendK8sEvent(t *testing.T) {
fakeRecorder := record.NewFakeRecorder(100)
eventSender := NewEventSender(fakeRecorder)
eventSender := newK8sSender(fakeRecorder)

eventSender.SendK8sEvent(common.PhaseAppDeployment, "pre-event", &v1alpha3.KeptnAppVersion{
eventSender.Emit(common.PhaseAppDeployment, "pre-event", &v1alpha3.KeptnAppVersion{
ObjectMeta: v1.ObjectMeta{
Name: "app",
Namespace: "ns",
Expand Down
13 changes: 7 additions & 6 deletions lifecycle-operator/controllers/common/phasehandler.go
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-logr/logr"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
controllererrors "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/errors"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/lifecycle/interfaces"
"go.opentelemetry.io/otel/codes"
Expand All @@ -16,9 +17,9 @@ import (

type PhaseHandler struct {
client.Client
EventSender EventSender
EventSender IEvent
Log logr.Logger
SpanHandler ISpanHandler
SpanHandler telemetry.ISpanHandler
}

type PhaseResult struct {
Expand All @@ -39,7 +40,7 @@ func (r PhaseHandler) HandlePhase(ctx context.Context, ctxTrace context.Context,
return &PhaseResult{Continue: false, Result: ctrl.Result{}}, nil
}
if oldPhase != phase.ShortName {
r.EventSender.SendK8sEvent(phase, "Normal", reconcileObject, apicommon.PhaseStateStarted, "has started", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Normal", reconcileObject, apicommon.PhaseStateStarted, "has started", piWrapper.GetVersion())
piWrapper.SetCurrentPhase(phase.ShortName)
}

Expand All @@ -51,7 +52,7 @@ func (r PhaseHandler) HandlePhase(ctx context.Context, ctxTrace context.Context,
state, err := reconcilePhase(spanPhaseCtx)
if err != nil {
spanPhaseTrace.AddEvent(phase.LongName + " could not get reconciled")
r.EventSender.SendK8sEvent(phase, "Warning", reconcileObject, apicommon.PhaseStateReconcileError, "could not get reconciled", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Warning", reconcileObject, apicommon.PhaseStateReconcileError, "could not get reconciled", piWrapper.GetVersion())
span.SetStatus(codes.Error, err.Error())
return &PhaseResult{Continue: false, Result: requeueResult}, err
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (r PhaseHandler) handleCompletedPhase(state apicommon.KeptnState, piWrapper
if err := r.SpanHandler.UnbindSpan(reconcileObject, phase.ShortName); err != nil {
r.Log.Error(err, controllererrors.ErrCouldNotUnbindSpan, reconcileObject.GetName())
}
r.EventSender.SendK8sEvent(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "has failed", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "has failed", piWrapper.GetVersion())
piWrapper.DeprecateRemainingPhases(phase)
return &PhaseResult{Continue: false, Result: ctrl.Result{}}, nil
}
Expand All @@ -100,7 +101,7 @@ func (r PhaseHandler) handleCompletedPhase(state apicommon.KeptnState, piWrapper
if err := r.SpanHandler.UnbindSpan(reconcileObject, phase.ShortName); err != nil {
r.Log.Error(err, controllererrors.ErrCouldNotUnbindSpan, reconcileObject.GetName())
}
r.EventSender.SendK8sEvent(phase, "Normal", reconcileObject, apicommon.PhaseStateFinished, "has finished", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Normal", reconcileObject, apicommon.PhaseStateFinished, "has finished", piWrapper.GetVersion())

return &PhaseResult{Continue: true, Result: ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}}, nil
}
15 changes: 8 additions & 7 deletions lifecycle-operator/controllers/common/phasehandler_test.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,7 +34,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "deprecated",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
},
object: &v1alpha3.KeptnAppVersion{
Status: v1alpha3.KeptnAppVersionStatus{
Expand All @@ -51,7 +52,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase error",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand All @@ -78,7 +79,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase pending state",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand All @@ -105,7 +106,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase progressing state",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand All @@ -132,7 +133,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase succeeded state",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand All @@ -159,7 +160,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase failed state",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestPhaseHandler(t *testing.T) {
{
name: "reconcilePhase unknown state",
handler: PhaseHandler{
SpanHandler: &SpanHandler{},
SpanHandler: &telemetry.SpanHandler{},
Log: ctrl.Log.WithName("controller"),
EventSender: NewEventSender(record.NewFakeRecorder(100)),
Client: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build(),
Expand Down
9 changes: 5 additions & 4 deletions lifecycle-operator/controllers/common/taskhandler.go
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-logr/logr"
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
controllererrors "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/errors"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/lifecycle/interfaces"
"go.opentelemetry.io/otel/codes"
Expand All @@ -21,11 +22,11 @@ import (

type TaskHandler struct {
client.Client
EventSender EventSender
EventSender IEvent
Log logr.Logger
Tracer trace.Tracer
Scheme *runtime.Scheme
SpanHandler ISpanHandler
SpanHandler telemetry.ISpanHandler
}

type CreateTaskAttributes struct {
Expand Down Expand Up @@ -57,7 +58,7 @@ func (r TaskHandler) ReconcileTasks(ctx context.Context, phaseCtx context.Contex
taskExists := false

if oldstatus != taskStatus.Status {
r.EventSender.SendK8sEvent(phase, "Normal", reconcileObject, apicommon.PhaseStateStatusChanged, fmt.Sprintf("task status changed from %s to %s", oldstatus, taskStatus.Status), piWrapper.GetVersion())
r.EventSender.Emit(phase, "Normal", reconcileObject, apicommon.PhaseStateStatusChanged, fmt.Sprintf("task status changed from %s to %s", oldstatus, taskStatus.Status), piWrapper.GetVersion())
}

// Check if task has already succeeded or failed
Expand Down Expand Up @@ -130,7 +131,7 @@ func (r TaskHandler) CreateKeptnTask(ctx context.Context, namespace string, reco
err = r.Client.Create(ctx, &newTask)
if err != nil {
r.Log.Error(err, "could not create KeptnTask")
r.EventSender.SendK8sEvent(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "could not create KeptnTask", piWrapper.GetVersion())
r.EventSender.Emit(phase, "Warning", reconcileObject, apicommon.PhaseStateFailed, "could not create KeptnTask", piWrapper.GetVersion())
return "", err
}

Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"net"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package common
package telemetry

import "go.opentelemetry.io/otel/trace"

Expand Down
11 changes: 6 additions & 5 deletions lifecycle-operator/controllers/lifecycle/keptnapp/controller.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
operatorcommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/common"
controllercommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
controllererrors "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
Expand All @@ -47,9 +48,9 @@ const traceComponentName = "keptn/lifecycle-operator/app"
type KeptnAppReconciler struct {
client.Client
Scheme *runtime.Scheme
EventSender controllercommon.EventSender
EventSender controllercommon.IEvent
Log logr.Logger
TracerFactory controllercommon.TracerFactory
TracerFactory telemetry.TracerFactory
}

// +kubebuilder:rbac:groups=lifecycle.keptn.sh,resources=keptnapps,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -105,7 +106,7 @@ func (r *KeptnAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err != nil {
r.Log.Error(err, "could not create AppVersion")
span.SetStatus(codes.Error, err.Error())
r.EventSender.SendK8sEvent(common.PhaseCreateAppVersion, "Warning", appVersion, common.PhaseStateFailed, "Could not create KeptnAppVersion", appVersion.Spec.Version)
r.EventSender.Emit(common.PhaseCreateAppVersion, "Warning", appVersion, common.PhaseStateFailed, "Could not create KeptnAppVersion", appVersion.Spec.Version)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -171,7 +172,7 @@ func (r *KeptnAppReconciler) handleGenerationBump(ctx context.Context, app *klcv
if app.Generation != 1 {
if err := r.deprecateAppVersions(ctx, app); err != nil {
r.Log.Error(err, "could not deprecate appVersions for appVersion %s", app.GetAppVersionName())
r.EventSender.SendK8sEvent(common.PhaseDeprecateAppVersion, "Warning", app, common.PhaseStateFailed, fmt.Sprintf("could not deprecate outdated revisions of KeptnAppVersion: %s", app.GetAppVersionName()), app.Spec.Version)
r.EventSender.Emit(common.PhaseDeprecateAppVersion, "Warning", app, common.PhaseStateFailed, fmt.Sprintf("could not deprecate outdated revisions of KeptnAppVersion: %s", app.GetAppVersionName()), app.Spec.Version)
return err
}
}
Expand Down Expand Up @@ -200,6 +201,6 @@ func (r *KeptnAppReconciler) deprecateAppVersions(ctx context.Context, app *klcv
return lastResultErr
}

func (r *KeptnAppReconciler) getTracer() controllercommon.ITracer {
func (r *KeptnAppReconciler) getTracer() telemetry.ITracer {
return r.TracerFactory.GetTracer(traceComponentName)
}
Expand Up @@ -25,6 +25,7 @@ import (
klcv1alpha3 "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3"
apicommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/apis/lifecycle/v1alpha3/common"
controllercommon "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common"
"github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/common/telemetry"
controllererrors "github.com/keptn/lifecycle-toolkit/lifecycle-operator/controllers/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
Expand All @@ -47,10 +48,10 @@ type KeptnAppVersionReconciler struct {
Scheme *runtime.Scheme
client.Client
Log logr.Logger
EventSender controllercommon.EventSender
TracerFactory controllercommon.TracerFactory
EventSender controllercommon.IEvent
TracerFactory telemetry.TracerFactory
Meters apicommon.KeptnMeters
SpanHandler controllercommon.ISpanHandler
SpanHandler telemetry.ISpanHandler
}

// +kubebuilder:rbac:groups=lifecycle.keptn.sh,resources=keptnappversions,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -181,7 +182,7 @@ func (r *KeptnAppVersionReconciler) finishKeptnAppVersionReconcile(ctx context.C
return ctrl.Result{Requeue: true}, err
}

r.EventSender.SendK8sEvent(apicommon.PhaseAppCompleted, "Normal", appVersion, apicommon.PhaseStateFinished, "has finished", appVersion.GetVersion())
r.EventSender.Emit(apicommon.PhaseAppCompleted, "Normal", appVersion, apicommon.PhaseStateFinished, "has finished", appVersion.GetVersion())

attrs := appVersion.GetMetricsAttributes()

Expand Down
Expand Up @@ -41,7 +41,7 @@ func (r *KeptnAppVersionReconciler) reconcileWorkloads(ctx context.Context, appV
}

if !found {
r.EventSender.SendK8sEvent(phase, "Warning", appVersion, apicommon.PhaseStateNotFound, fmt.Sprintf("could not find KeptnWorkloadInstance for KeptnWorkload: %s ", w.Name), appVersion.GetVersion())
r.EventSender.Emit(phase, "Warning", appVersion, apicommon.PhaseStateNotFound, fmt.Sprintf("could not find KeptnWorkloadInstance for KeptnWorkload: %s ", w.Name), appVersion.GetVersion())
}

newStatus = append(newStatus, klcv1alpha3.WorkloadStatus{
Expand Down

0 comments on commit 0130576

Please sign in to comment.