From a4cf4ad7550a6e2b81305470d0b9dddd2be95e6c Mon Sep 17 00:00:00 2001 From: "Zhongduo Lin (Jimmy)" Date: Mon, 5 Oct 2020 15:38:34 -0400 Subject: [PATCH] Remove gpubsub in sources (#1761) * (feat/source): Remove gpubsub from Topic and PullSubscription Topic and PullSubscription reconcilers still use the gpubsub client, making their tests different from the Broker and Trigger tests which use pstest. With this, error injection is done by setting server option to the pubsub testing server. The reconcilers will use google-cloud-go pubsub directly. * Fix static pullsubscription, no subscription check yet * add subscription check to pullsubscription_test.go * apply change to keda pullsubscription * Use create function in topic * use create function for pullsubscription * remove helper as they are not used here * Add connection cleanup goroutine, fix typo --- .../pullsubscription/keda/controller.go | 4 +- .../keda/pullsubscription_test.go | 177 +++-- .../intevents/pullsubscription/reconciler.go | 7 +- .../pullsubscription/static/controller.go | 4 +- .../static/pullsubscription_test.go | 195 +++--- pkg/reconciler/intevents/topic/controller.go | 4 +- pkg/reconciler/intevents/topic/topic.go | 5 +- pkg/reconciler/intevents/topic/topic_test.go | 645 ++++++++++-------- pkg/reconciler/testing/pstest.go | 37 +- pkg/reconciler/utils/pubsub/types.go | 7 + 10 files changed, 626 insertions(+), 459 deletions(-) diff --git a/pkg/reconciler/intevents/pullsubscription/keda/controller.go b/pkg/reconciler/intevents/pullsubscription/keda/controller.go index 2e04fb26f3..64fc57e6b9 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/controller.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/controller.go @@ -21,6 +21,7 @@ import ( "knative.dev/pkg/injection" + "cloud.google.com/go/pubsub" "go.uber.org/zap" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" @@ -31,7 +32,6 @@ import ( "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1/resource" pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription" pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -104,7 +104,7 @@ func newController( DeploymentLister: deploymentInformer.Lister(), PullSubscriptionLister: pullSubscriptionInformer.Lister(), ReceiveAdapterImage: env.ReceiveAdapter, - CreateClientFn: gpubsub.NewClient, + CreateClientFn: pubsub.NewClient, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go index ce75a7bd1c..acc9629b20 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go @@ -18,12 +18,16 @@ package keda import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -50,13 +54,13 @@ import ( pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1/resource" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" . "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/keda/resources" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" . "github.com/google/knative-gcp/pkg/reconciler/testing" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -277,9 +281,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: client-create-induced-error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - CreateClientErr: errors.New("client-create-induced-error"), - }, + "client-error": "client-create-induced-error", }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -332,14 +334,11 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -360,13 +359,16 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "topic does not exist", Objects: []runtime.Object{ @@ -394,13 +396,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: Topic %q does not exist", testTopicID), }, - OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: false, - }, - }, - }, + OtherTestData: map[string]interface{}{}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, reconcilertestingv1.WithPullSubscriptionUID(sourceUID), @@ -427,6 +423,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "subscription exists fails", Objects: []runtime.Object{ @@ -452,14 +451,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - SubscriptionData: gpubsub.TestSubscriptionData{ - ExistsErr: errors.New("subscription-exists-induced-error"), - }, - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetSubscription", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -480,7 +475,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -512,15 +507,13 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-create-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - CreateSubscriptionErr: errors.New("subscription-create-induced-error"), + "pre": []PubsubAction{ + Topic(testTopicID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateSubscription", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -541,7 +534,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-create-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -561,10 +554,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -599,6 +590,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - match", Objects: []runtime.Object{ @@ -621,10 +615,8 @@ func TestAllCases(t *testing.T) { newAvailableReceiveAdapter(context.Background(), testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -662,6 +654,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - mismatch", Objects: []runtime.Object{ @@ -689,10 +684,8 @@ func TestAllCases(t *testing.T) { newScaledObject(newPullSubscription(testSubscriptionID)), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -735,6 +728,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "deleting - failed to delete subscription", Objects: []runtime.Object{ @@ -753,25 +749,21 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(testSubscriptionID), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - DeleteErr: errors.New("subscription-delete-induced-error"), - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteSubscription", codes.Unknown, "Injected error")}, }, Key: testNS + "/" + sourceName, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: subscription-delete-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: rpc error: code = Unknown desc = Injected error"), }, WantStatusUpdates: nil, }, { @@ -797,12 +789,13 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), @@ -862,10 +855,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -907,6 +898,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "update receiver adapter fails", Objects: []runtime.Object{ @@ -931,10 +925,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -981,6 +973,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successfully deleted subscription", Objects: []runtime.Object{ @@ -999,22 +994,20 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(testSubscriptionID), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), - reconcilertestingv1.WithPullSubscriptionSubscriptionID(""), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, Key: testNS + "/" + sourceName, WantEvents: nil, WantStatusUpdates: nil, @@ -1023,6 +1016,40 @@ func TestAllCases(t *testing.T) { table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { ctx = addressable.WithDuck(ctx) ctx = resource.WithDuck(ctx) + opts := []pstest.ServerReactorOption{} + if testData != nil && testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -1033,7 +1060,7 @@ func TestAllCases(t *testing.T) { PullSubscriptionLister: listers.GetPullSubscriptionLister(), UriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), ReceiveAdapterImage: testImage, - CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), + CreateClientFn: createClientFn, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/reconciler.go b/pkg/reconciler/intevents/pullsubscription/reconciler.go index fa5625503b..fae07d6d95 100644 --- a/pkg/reconciler/intevents/pullsubscription/reconciler.go +++ b/pkg/reconciler/intevents/pullsubscription/reconciler.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "cloud.google.com/go/pubsub" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -44,10 +45,10 @@ import ( v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" "github.com/google/knative-gcp/pkg/tracing" ) @@ -92,7 +93,7 @@ type Base struct { // CreateClientFn is the function used to create the Pub/Sub client that interacts with Pub/Sub. // This is needed so that we can inject a mock client for UTs purposes. - CreateClientFn gpubsub.CreateFn + CreateClientFn reconcilerutilspubsub.CreateFn // ReconcileDataPlaneFn is the function used to reconcile the data plane resources. ReconcileDataPlaneFn ReconcileDataPlaneFunc @@ -196,7 +197,7 @@ func (r *Base) reconcileSubscription(ctx context.Context, ps *v1.PullSubscriptio } // subConfig is the wanted config based on settings. - subConfig := gpubsub.SubscriptionConfig{ + subConfig := pubsub.SubscriptionConfig{ Topic: t, RetainAckedMessages: ps.Spec.RetainAckedMessages, } diff --git a/pkg/reconciler/intevents/pullsubscription/static/controller.go b/pkg/reconciler/intevents/pullsubscription/static/controller.go index 3eb52d0b91..6d6a5dc91f 100644 --- a/pkg/reconciler/intevents/pullsubscription/static/controller.go +++ b/pkg/reconciler/intevents/pullsubscription/static/controller.go @@ -21,11 +21,11 @@ import ( "knative.dev/pkg/injection" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" "github.com/google/knative-gcp/pkg/apis/duck" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -100,7 +100,7 @@ func newController( DeploymentLister: deploymentInformer.Lister(), PullSubscriptionLister: pullSubscriptionInformer.Lister(), ReceiveAdapterImage: env.ReceiveAdapter, - CreateClientFn: gpubsub.NewClient, + CreateClientFn: pubsub.NewClient, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go index 3ec445aade..67466997d0 100644 --- a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go +++ b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go @@ -18,12 +18,16 @@ package static import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" v1 "k8s.io/api/apps/v1" @@ -49,12 +53,12 @@ import ( gcpduckv1 "github.com/google/knative-gcp/pkg/apis/duck/v1" pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" . "github.com/google/knative-gcp/pkg/reconciler/testing" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -251,9 +255,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: client-create-induced-error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - CreateClientErr: errors.New("client-create-induced-error"), - }, + "client-error": "client-create-induced-error", }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -304,14 +306,11 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -334,10 +333,13 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "topic does not exist", Objects: []runtime.Object{ @@ -364,13 +366,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: Topic %q does not exist", testTopicID), }, - OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: false, - }, - }, - }, + OtherTestData: map[string]interface{}{}, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, @@ -396,6 +392,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "subscription exists fails", Objects: []runtime.Object{ @@ -420,14 +419,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - SubscriptionData: gpubsub.TestSubscriptionData{ - ExistsErr: errors.New("subscription-exists-induced-error"), - }, - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetSubscription", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -450,7 +445,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -478,15 +473,13 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-create-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - CreateSubscriptionErr: errors.New("subscription-create-induced-error"), + "pre": []PubsubAction{ + Topic(testTopicID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateSubscription", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -509,7 +502,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-create-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -540,10 +533,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -576,6 +567,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "sink namespace empty, default to the source one", Objects: []runtime.Object{ @@ -606,10 +600,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -642,6 +634,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "sink URI set instead of ref", Objects: []runtime.Object{ @@ -674,10 +669,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -710,6 +703,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - match", Objects: []runtime.Object{ @@ -731,10 +727,8 @@ func TestAllCases(t *testing.T) { newAvailableReceiveAdapter(context.Background(), testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -768,6 +762,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - mismatch", Objects: []runtime.Object{ @@ -791,10 +788,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -837,6 +832,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "get existing receiver adapter fails", Objects: []runtime.Object{ @@ -859,10 +857,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -899,6 +895,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "create receiver adapter fails", Objects: []runtime.Object{ @@ -921,10 +920,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -964,6 +961,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "update receiver adapter fails", Objects: []runtime.Object{ @@ -987,10 +987,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -1035,6 +1033,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "deleting - failed to delete subscription", Objects: []runtime.Object{ @@ -1052,25 +1053,21 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - DeleteErr: errors.New("subscription-delete-induced-error"), - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteSubscription", codes.Unknown, "Injected error")}, }, Key: testNS + "/" + sourceName, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: subscription-delete-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: rpc error: code = Unknown desc = Injected error"), }, WantStatusUpdates: nil, }, { @@ -1091,27 +1088,61 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, Key: testNS + "/" + sourceName, WantEvents: nil, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { ctx = addressable.WithDuck(ctx) + opts := []pstest.ServerReactorOption{} + if testData != nil && testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -1122,7 +1153,7 @@ func TestAllCases(t *testing.T) { PullSubscriptionLister: listers.GetPullSubscriptionLister(), UriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), ReceiveAdapterImage: testImage, - CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), + CreateClientFn: createClientFn, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/topic/controller.go b/pkg/reconciler/intevents/topic/controller.go index 9acc7e0daa..8971b52fb3 100644 --- a/pkg/reconciler/intevents/topic/controller.go +++ b/pkg/reconciler/intevents/topic/controller.go @@ -29,10 +29,10 @@ import ( "knative.dev/pkg/logging" tracingconfig "knative.dev/pkg/tracing/config" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -96,7 +96,7 @@ func newController( topicLister: topicInformer.Lister(), serviceLister: serviceInformer.Lister(), publisherImage: env.Publisher, - createClientFn: gpubsub.NewClient, + createClientFn: pubsub.NewClient, } impl := topicreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index fe135ff233..a08480b4c7 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -47,10 +47,10 @@ import ( v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" topicreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/topic" listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/topic/resources" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -83,7 +83,7 @@ type Reconciler struct { // createClientFn is the function used to create the Pub/Sub client that interacts with Pub/Sub. // This is needed so that we can inject a mock client for UTs purposes. - createClientFn gpubsub.CreateFn + createClientFn reconcilerutilspubsub.CreateFn } // Check that our Reconciler implements Interface. @@ -204,7 +204,6 @@ func (r *Reconciler) deleteTopic(ctx context.Context, topic *v1.Topic) error { return err } defer client.Close() - t := client.Topic(topic.Status.TopicID) exists, err := t.Exists(ctx) if err != nil { diff --git a/pkg/reconciler/intevents/topic/topic_test.go b/pkg/reconciler/intevents/topic/topic_test.go index c82df09670..dc1e6d11d7 100644 --- a/pkg/reconciler/intevents/topic/topic_test.go +++ b/pkg/reconciler/intevents/topic/topic_test.go @@ -18,12 +18,18 @@ package topic import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" + reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,7 +48,6 @@ import ( pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/topic" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/topic/resources" @@ -149,9 +154,7 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + topicName, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - CreateClientErr: errors.New("create-client-induced-error"), - }, + "client-error": "create-client-induced-error", }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), @@ -194,15 +197,12 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + topicName, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: rpc error: code = Internal desc = Injected error"), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, topicName, resourceGroup), @@ -219,7 +219,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicPropagationPolicy("NoCreateNoDelete"), // Updates reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithTopicSetDefaults, ), }}, @@ -263,6 +263,10 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, }, { Name: "create topic fails", Objects: []runtime.Object{ @@ -282,12 +286,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + topicName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: create-topic-induced-error"), + Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: rpc error: code = Unknown desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - CreateTopicErr: errors.New("create-topic-induced-error"), - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateTopic", codes.Unknown, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, topicName, resourceGroup), @@ -304,10 +306,13 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), // Updates reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "create-topic-induced-error")), + reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "rpc error: code = Unknown desc = Injected error")), reconcilertestingv1.WithTopicSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, }, { Name: "topic created with EnablePublisher = false", Objects: []runtime.Object{ @@ -350,6 +355,10 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, }, { Name: "publisher has not yet been reconciled", Objects: []runtime.Object{ @@ -394,272 +403,350 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, - }, - { - Name: "the status of publisher is false", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeFalseStatusPublisher("PublisherNotDeployed", "PublisherNotDeployed")), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherNotDeployed("PublisherNotDeployed", "PublisherNotDeployed"), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "the status of publisher is unknown", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeUnknownStatusPublisher("PublisherUnknown", "PublisherUnknown")), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherUnknown("PublisherUnknown", "PublisherUnknown"), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "topic successfully reconciles and is ready", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeReadyPublisher()), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherDeployed, - reconcilertestingv1.WithTopicAddress(testTopicURI), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "topic successfully reconciles and reuses existing publisher", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - makeReadyPublisher(), - NewService(topicName+"-topic", testNS, - WithServiceOwnerReferences(ownerReferences()), - WithServiceLabels(resources.GetLabels(controllerAgentName, topicName)), - WithServicePorts(servicePorts())), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{}, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherDeployed, - reconcilertestingv1.WithTopicAddress(testTopicURI), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "delete topic - policy CreateNoDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantEvents: nil, - WantStatusUpdates: nil, - }, { - Name: "delete topic - policy CreateDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), - reconcilertestingv1.WithTopicTopicID(topicName), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantEvents: nil, - WantStatusUpdates: nil, - }, { - Name: "fail to delete - policy CreateDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), - reconcilertestingv1.WithTopicTopicID(topicName), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "the status of publisher is false", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeFalseStatusPublisher("PublisherNotDeployed", "PublisherNotDeployed")), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherNotDeployed("PublisherNotDeployed", "PublisherNotDeployed"), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "the status of publisher is unknown", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeUnknownStatusPublisher("PublisherUnknown", "PublisherUnknown")), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherUnknown("PublisherUnknown", "PublisherUnknown"), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "topic successfully reconciles and is ready", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeReadyPublisher()), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherDeployed, + reconcilertestingv1.WithTopicAddress(testTopicURI), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "topic successfully reconciles and reuses existing publisher", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + makeReadyPublisher(), + NewService(topicName+"-topic", testNS, + WithServiceOwnerReferences(ownerReferences()), + WithServiceLabels(resources.GetLabels(controllerAgentName, topicName)), + WithServicePorts(servicePorts())), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{}, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherDeployed, + reconcilertestingv1.WithTopicAddress(testTopicURI), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "delete topic - policy CreateNoDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - Key: testNS + "/" + topicName, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, deleteTopicFailed, "Failed to delete Pub/Sub topic: delete-topic-induced-error"), + }, + Key: testNS + "/" + topicName, + WantEvents: nil, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "delete topic - policy CreateDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - DeleteErr: errors.New("delete-topic-induced-error"), - }, - }, + }, + Key: testNS + "/" + topicName, + WantEvents: nil, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, + }, { + Name: "fail to delete - policy CreateDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, deleteTopicFailed, "Failed to delete Pub/Sub topic: rpc error: code = Unknown desc = Injected error"), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - WantStatusUpdates: nil, - }} + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteTopic", codes.Unknown, "Injected error")}, + }, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, + } table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { + // Insert pubsub client for PostConditions and create fixtures + opts := []pstest.ServerReactorOption{} + if testData != nil { + if testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + } + + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -668,7 +755,7 @@ func TestAllCases(t *testing.T) { topicLister: listers.GetTopicLister(), serviceLister: listers.GetV1ServiceLister(), publisherImage: testImage, - createClientFn: gpubsub.TestClientCreator(testData["topic"]), + createClientFn: createClientFn, } return topic.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetTopicLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/testing/pstest.go b/pkg/reconciler/testing/pstest.go index fea84c53db..82400d70ef 100644 --- a/pkg/reconciler/testing/pstest.go +++ b/pkg/reconciler/testing/pstest.go @@ -197,19 +197,34 @@ func getPubsubClient(r *rtesting.TableRow) *pubsub.Client { return r.OtherTestData["_psclient"].(*pubsub.Client) } -func TestPubsubClient(ctx context.Context, projectID string) (*pubsub.Client, func()) { - srv := pstest.NewServer() - conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) - if err != nil { - panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) - } +func TestPubsubClient(ctx context.Context, projectID string, opts ...pstest.ServerReactorOption) (*pubsub.Client, func()) { + srv := pstest.NewServer(opts...) + client, _ := GetTestClientCreateFunc(srv.Addr)(ctx, projectID) close := func() { srv.Close() - conn.Close() + client.Close() } - c, err := pubsub.NewClient(context.Background(), projectID, option.WithGRPCConn(conn)) - if err != nil { - panic(fmt.Errorf("failed to create test pubsub client: %v", err)) + return client, close +} + +// GetTestClientCreateFunc returns a client creation function with same type as pubsub.NewClient. With +// this helper function, multiple clients can be created. This is necessary for any test involving +// mulitple projects. Eg. in sources multiple project is allowed for topics. +func GetTestClientCreateFunc(target string) func(context.Context, string, ...option.ClientOption) (*pubsub.Client, error) { + return func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + newConn, err := grpc.Dial(target, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + // Connection cleanup + go func() { + <-ctx.Done() + newConn.Close() + }() + c, err := pubsub.NewClient(context.Background(), projectID, option.WithGRPCConn(newConn)) + if err != nil { + panic(fmt.Errorf("failed to create test pubsub client: %v", err)) + } + return c, nil } - return c, close } diff --git a/pkg/reconciler/utils/pubsub/types.go b/pkg/reconciler/utils/pubsub/types.go index e2455961bb..d5c19e1b01 100644 --- a/pkg/reconciler/utils/pubsub/types.go +++ b/pkg/reconciler/utils/pubsub/types.go @@ -17,10 +17,17 @@ limitations under the License. package pubsub import ( + "context" + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" "k8s.io/client-go/tools/record" ) +// CreateFn is a factory function to create a Pub/Sub client. It is copied from gclient/pubsub +// to avoid unnecessary dependency on gpubsub. +type CreateFn func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) + type Reconciler struct { client *pubsub.Client recorder record.EventRecorder