diff --git a/pkg/reconciler/intevents/pullsubscription/keda/controller.go b/pkg/reconciler/intevents/pullsubscription/keda/controller.go index 2e04fb26f3..64fc57e6b9 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/controller.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/controller.go @@ -21,6 +21,7 @@ import ( "knative.dev/pkg/injection" + "cloud.google.com/go/pubsub" "go.uber.org/zap" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" @@ -31,7 +32,6 @@ import ( "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1/resource" pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription" pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -104,7 +104,7 @@ func newController( DeploymentLister: deploymentInformer.Lister(), PullSubscriptionLister: pullSubscriptionInformer.Lister(), ReceiveAdapterImage: env.ReceiveAdapter, - CreateClientFn: gpubsub.NewClient, + CreateClientFn: pubsub.NewClient, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go index ce75a7bd1c..acc9629b20 100644 --- a/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go +++ b/pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go @@ -18,12 +18,16 @@ package keda import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -50,13 +54,13 @@ import ( pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1/resource" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" . "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/keda/resources" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" . "github.com/google/knative-gcp/pkg/reconciler/testing" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -277,9 +281,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: client-create-induced-error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - CreateClientErr: errors.New("client-create-induced-error"), - }, + "client-error": "client-create-induced-error", }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -332,14 +334,11 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -360,13 +359,16 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "topic does not exist", Objects: []runtime.Object{ @@ -394,13 +396,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: Topic %q does not exist", testTopicID), }, - OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: false, - }, - }, - }, + OtherTestData: map[string]interface{}{}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, reconcilertestingv1.WithPullSubscriptionUID(sourceUID), @@ -427,6 +423,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "subscription exists fails", Objects: []runtime.Object{ @@ -452,14 +451,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - SubscriptionData: gpubsub.TestSubscriptionData{ - ExistsErr: errors.New("subscription-exists-induced-error"), - }, - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetSubscription", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -480,7 +475,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -512,15 +507,13 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-create-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - CreateSubscriptionErr: errors.New("subscription-create-induced-error"), + "pre": []PubsubAction{ + Topic(testTopicID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateSubscription", codes.Internal, "Injected error")}, }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -541,7 +534,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-create-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -561,10 +554,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -599,6 +590,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - match", Objects: []runtime.Object{ @@ -621,10 +615,8 @@ func TestAllCases(t *testing.T) { newAvailableReceiveAdapter(context.Background(), testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -662,6 +654,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - mismatch", Objects: []runtime.Object{ @@ -689,10 +684,8 @@ func TestAllCases(t *testing.T) { newScaledObject(newPullSubscription(testSubscriptionID)), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -735,6 +728,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "deleting - failed to delete subscription", Objects: []runtime.Object{ @@ -753,25 +749,21 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(testSubscriptionID), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - DeleteErr: errors.New("subscription-delete-induced-error"), - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteSubscription", codes.Unknown, "Injected error")}, }, Key: testNS + "/" + sourceName, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: subscription-delete-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: rpc error: code = Unknown desc = Injected error"), }, WantStatusUpdates: nil, }, { @@ -797,12 +789,13 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), @@ -862,10 +855,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -907,6 +898,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "update receiver adapter fails", Objects: []runtime.Object{ @@ -931,10 +925,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -981,6 +973,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successfully deleted subscription", Objects: []runtime.Object{ @@ -999,22 +994,20 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(testSubscriptionID), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), - reconcilertestingv1.WithPullSubscriptionSubscriptionID(""), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, Key: testNS + "/" + sourceName, WantEvents: nil, WantStatusUpdates: nil, @@ -1023,6 +1016,40 @@ func TestAllCases(t *testing.T) { table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { ctx = addressable.WithDuck(ctx) ctx = resource.WithDuck(ctx) + opts := []pstest.ServerReactorOption{} + if testData != nil && testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -1033,7 +1060,7 @@ func TestAllCases(t *testing.T) { PullSubscriptionLister: listers.GetPullSubscriptionLister(), UriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), ReceiveAdapterImage: testImage, - CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), + CreateClientFn: createClientFn, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/reconciler.go b/pkg/reconciler/intevents/pullsubscription/reconciler.go index fa5625503b..fae07d6d95 100644 --- a/pkg/reconciler/intevents/pullsubscription/reconciler.go +++ b/pkg/reconciler/intevents/pullsubscription/reconciler.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "cloud.google.com/go/pubsub" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -44,10 +45,10 @@ import ( v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" "github.com/google/knative-gcp/pkg/tracing" ) @@ -92,7 +93,7 @@ type Base struct { // CreateClientFn is the function used to create the Pub/Sub client that interacts with Pub/Sub. // This is needed so that we can inject a mock client for UTs purposes. - CreateClientFn gpubsub.CreateFn + CreateClientFn reconcilerutilspubsub.CreateFn // ReconcileDataPlaneFn is the function used to reconcile the data plane resources. ReconcileDataPlaneFn ReconcileDataPlaneFunc @@ -196,7 +197,7 @@ func (r *Base) reconcileSubscription(ctx context.Context, ps *v1.PullSubscriptio } // subConfig is the wanted config based on settings. - subConfig := gpubsub.SubscriptionConfig{ + subConfig := pubsub.SubscriptionConfig{ Topic: t, RetainAckedMessages: ps.Spec.RetainAckedMessages, } diff --git a/pkg/reconciler/intevents/pullsubscription/static/controller.go b/pkg/reconciler/intevents/pullsubscription/static/controller.go index 3eb52d0b91..6d6a5dc91f 100644 --- a/pkg/reconciler/intevents/pullsubscription/static/controller.go +++ b/pkg/reconciler/intevents/pullsubscription/static/controller.go @@ -21,11 +21,11 @@ import ( "knative.dev/pkg/injection" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" "github.com/google/knative-gcp/pkg/apis/duck" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -100,7 +100,7 @@ func newController( DeploymentLister: deploymentInformer.Lister(), PullSubscriptionLister: pullSubscriptionInformer.Lister(), ReceiveAdapterImage: env.ReceiveAdapter, - CreateClientFn: gpubsub.NewClient, + CreateClientFn: pubsub.NewClient, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go index 3ec445aade..67466997d0 100644 --- a/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go +++ b/pkg/reconciler/intevents/pullsubscription/static/pullsubscription_test.go @@ -18,12 +18,16 @@ package static import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" v1 "k8s.io/api/apps/v1" @@ -49,12 +53,12 @@ import ( gcpduckv1 "github.com/google/knative-gcp/pkg/apis/duck/v1" pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" psreconciler "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription" "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources" . "github.com/google/knative-gcp/pkg/reconciler/testing" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -251,9 +255,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: client-create-induced-error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - CreateClientErr: errors.New("client-create-induced-error"), - }, + "client-error": "client-create-induced-error", }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilertestingv1.NewPullSubscription(sourceName, testNS, @@ -304,14 +306,11 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -334,10 +333,13 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "topic does not exist", Objects: []runtime.Object{ @@ -364,13 +366,7 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: Topic %q does not exist", testTopicID), }, - OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: false, - }, - }, - }, + OtherTestData: map[string]interface{}{}, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, @@ -396,6 +392,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, }, { Name: "subscription exists fails", Objects: []runtime.Object{ @@ -420,14 +419,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-exists-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - SubscriptionData: gpubsub.TestSubscriptionData{ - ExistsErr: errors.New("subscription-exists-induced-error"), - }, - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetSubscription", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -450,7 +445,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-exists-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -478,15 +473,13 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + sourceName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName), - Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: subscription-create-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionReconcileFailed", "Failed to reconcile Pub/Sub subscription: rpc error: code = Internal desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - CreateSubscriptionErr: errors.New("subscription-create-induced-error"), + "pre": []PubsubAction{ + Topic(testTopicID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateSubscription", codes.Internal, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), @@ -509,7 +502,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), reconcilertestingv1.WithPullSubscriptionMarkNoTransformer("TransformerNil", "Transformer is nil"), reconcilertestingv1.WithPullSubscriptionTransformerURI(nil), - reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "subscription-create-induced-error")), + reconcilertestingv1.WithPullSubscriptionMarkNoSubscription("SubscriptionReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileSubscriptionMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, @@ -540,10 +533,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -576,6 +567,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "sink namespace empty, default to the source one", Objects: []runtime.Object{ @@ -606,10 +600,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -642,6 +634,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "sink URI set instead of ref", Objects: []runtime.Object{ @@ -674,10 +669,8 @@ func TestAllCases(t *testing.T) { Eventf(corev1.EventTypeNormal, "PullSubscriptionReconciled", `PullSubscription reconciled: "%s/%s"`, testNS, sourceName), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, WantCreates: []runtime.Object{ @@ -710,6 +703,9 @@ func TestAllCases(t *testing.T) { WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, resourceGroup), }, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - match", Objects: []runtime.Object{ @@ -731,10 +727,8 @@ func TestAllCases(t *testing.T) { newAvailableReceiveAdapter(context.Background(), testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -768,6 +762,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "successful create - reuse existing receive adapter - mismatch", Objects: []runtime.Object{ @@ -791,10 +788,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -837,6 +832,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "get existing receiver adapter fails", Objects: []runtime.Object{ @@ -859,10 +857,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -899,6 +895,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "create receiver adapter fails", Objects: []runtime.Object{ @@ -921,10 +920,8 @@ func TestAllCases(t *testing.T) { newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -964,6 +961,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "update receiver adapter fails", Objects: []runtime.Object{ @@ -987,10 +987,8 @@ func TestAllCases(t *testing.T) { newReceiveAdapter(context.Background(), "old"+testImage, nil), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, + "pre": []PubsubAction{ + Topic(testTopicID), }, }, Key: testNS + "/" + sourceName, @@ -1035,6 +1033,9 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + OnlySubscriptions(testSubscriptionID), + }, }, { Name: "deleting - failed to delete subscription", Objects: []runtime.Object{ @@ -1052,25 +1053,21 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - DeleteErr: errors.New("subscription-delete-induced-error"), - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteSubscription", codes.Unknown, "Injected error")}, }, Key: testNS + "/" + sourceName, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: subscription-delete-induced-error"), + Eventf(corev1.EventTypeWarning, "SubscriptionDeleteFailed", "Failed to delete Pub/Sub subscription: rpc error: code = Unknown desc = Injected error"), }, WantStatusUpdates: nil, }, { @@ -1091,27 +1088,61 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithPullSubscriptionMarkSubscribed(testSubscriptionID), reconcilertestingv1.WithPullSubscriptionMarkDeployed(deploymentName(), testNS), reconcilertestingv1.WithPullSubscriptionMarkSink(sinkURI), + reconcilertestingv1.WithPullSubscriptionProjectID(testProject), reconcilertestingv1.WithPullSubscriptionDeleted, reconcilertestingv1.WithPullSubscriptionSetDefaults, ), newSecret(), }, OtherTestData: map[string]interface{}{ - "ps": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - }, - SubscriptionData: gpubsub.TestSubscriptionData{ - Exists: true, - }, + "pre": []PubsubAction{ + TopicAndSub(testTopicID, testSubscriptionID), }, }, + PostConditions: []func(*testing.T, *TableRow){ + NoSubscriptionsExist(), + }, Key: testNS + "/" + sourceName, WantEvents: nil, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { ctx = addressable.WithDuck(ctx) + opts := []pstest.ServerReactorOption{} + if testData != nil && testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -1122,7 +1153,7 @@ func TestAllCases(t *testing.T) { PullSubscriptionLister: listers.GetPullSubscriptionLister(), UriResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), ReceiveAdapterImage: testImage, - CreateClientFn: gpubsub.TestClientCreator(testData["ps"]), + CreateClientFn: createClientFn, ControllerAgentName: controllerAgentName, ResourceGroup: resourceGroup, }, diff --git a/pkg/reconciler/intevents/topic/controller.go b/pkg/reconciler/intevents/topic/controller.go index 9acc7e0daa..8971b52fb3 100644 --- a/pkg/reconciler/intevents/topic/controller.go +++ b/pkg/reconciler/intevents/topic/controller.go @@ -29,10 +29,10 @@ import ( "knative.dev/pkg/logging" tracingconfig "knative.dev/pkg/tracing/config" + "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" "github.com/google/knative-gcp/pkg/apis/configs/gcpauth" v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" @@ -96,7 +96,7 @@ func newController( topicLister: topicInformer.Lister(), serviceLister: serviceInformer.Lister(), publisherImage: env.Publisher, - createClientFn: gpubsub.NewClient, + createClientFn: pubsub.NewClient, } impl := topicreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/intevents/topic/topic.go b/pkg/reconciler/intevents/topic/topic.go index fe135ff233..a08480b4c7 100644 --- a/pkg/reconciler/intevents/topic/topic.go +++ b/pkg/reconciler/intevents/topic/topic.go @@ -47,10 +47,10 @@ import ( v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" topicreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/topic" listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/topic/resources" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" ) const ( @@ -83,7 +83,7 @@ type Reconciler struct { // createClientFn is the function used to create the Pub/Sub client that interacts with Pub/Sub. // This is needed so that we can inject a mock client for UTs purposes. - createClientFn gpubsub.CreateFn + createClientFn reconcilerutilspubsub.CreateFn } // Check that our Reconciler implements Interface. @@ -204,7 +204,6 @@ func (r *Reconciler) deleteTopic(ctx context.Context, topic *v1.Topic) error { return err } defer client.Close() - t := client.Topic(topic.Status.TopicID) exists, err := t.Exists(ctx) if err != nil { diff --git a/pkg/reconciler/intevents/topic/topic_test.go b/pkg/reconciler/intevents/topic/topic_test.go index c82df09670..dc1e6d11d7 100644 --- a/pkg/reconciler/intevents/topic/topic_test.go +++ b/pkg/reconciler/intevents/topic/topic_test.go @@ -18,12 +18,18 @@ package topic import ( "context" - "errors" "fmt" "strings" "testing" + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/pstest" + reconcilertestingv1 "github.com/google/knative-gcp/pkg/reconciler/testing/v1" + reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,7 +48,6 @@ import ( pubsubv1 "github.com/google/knative-gcp/pkg/apis/intevents/v1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/topic" - gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/reconciler/intevents/topic/resources" @@ -149,9 +154,7 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + topicName, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - CreateClientErr: errors.New("create-client-induced-error"), - }, + "client-error": "create-client-induced-error", }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), @@ -194,15 +197,12 @@ func TestAllCases(t *testing.T) { }, Key: testNS + "/" + topicName, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - ExistsErr: errors.New("topic-exists-induced-error"), - }, - }, + // GetTopic has a retry policy for Unknown status type, so we use Internal error instead. + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("GetTopic", codes.Internal, "Injected error")}, }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: topic-exists-induced-error"), + Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: rpc error: code = Internal desc = Injected error"), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, topicName, resourceGroup), @@ -219,7 +219,7 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicPropagationPolicy("NoCreateNoDelete"), // Updates reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "topic-exists-induced-error")), + reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "rpc error: code = Internal desc = Injected error")), reconcilertestingv1.WithTopicSetDefaults, ), }}, @@ -263,6 +263,10 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, }, { Name: "create topic fails", Objects: []runtime.Object{ @@ -282,12 +286,10 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + topicName, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: create-topic-induced-error"), + Eventf(corev1.EventTypeWarning, reconciledTopicFailedReason, "Failed to reconcile Pub/Sub topic: rpc error: code = Unknown desc = Injected error"), }, OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - CreateTopicErr: errors.New("create-topic-induced-error"), - }, + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("CreateTopic", codes.Unknown, "Injected error")}, }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, topicName, resourceGroup), @@ -304,10 +306,13 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), // Updates reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "create-topic-induced-error")), + reconcilertestingv1.WithTopicNoTopic("TopicReconcileFailed", fmt.Sprintf("%s: %s", failedToReconcileTopicMsg, "rpc error: code = Unknown desc = Injected error")), reconcilertestingv1.WithTopicSetDefaults, ), }}, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, }, { Name: "topic created with EnablePublisher = false", Objects: []runtime.Object{ @@ -350,6 +355,10 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, }, { Name: "publisher has not yet been reconciled", Objects: []runtime.Object{ @@ -394,272 +403,350 @@ func TestAllCases(t *testing.T) { reconcilertestingv1.WithTopicSetDefaults, ), }}, - }, - { - Name: "the status of publisher is false", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeFalseStatusPublisher("PublisherNotDeployed", "PublisherNotDeployed")), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherNotDeployed("PublisherNotDeployed", "PublisherNotDeployed"), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "the status of publisher is unknown", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeUnknownStatusPublisher("PublisherUnknown", "PublisherUnknown")), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherUnknown("PublisherUnknown", "PublisherUnknown"), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "topic successfully reconciles and is ready", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{ - ProvideResource("create", "services", makeReadyPublisher()), - }, - WantCreates: []runtime.Object{ - newPublisher(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherDeployed, - reconcilertestingv1.WithTopicAddress(testTopicURI), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "topic successfully reconciles and reuses existing publisher", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - makeReadyPublisher(), - NewService(topicName+"-topic", testNS, - WithServiceOwnerReferences(ownerReferences()), - WithServiceLabels(resources.GetLabels(controllerAgentName, topicName)), - WithServicePorts(servicePorts())), - }, - Key: testNS + "/" + topicName, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, topicName, resourceGroup), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), - Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), - }, - WithReactors: []clientgotesting.ReactionFunc{}, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicProjectID(testProject), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - // Updates - reconcilertestingv1.WithInitTopicConditions, - reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), - reconcilertestingv1.WithTopicPublisherDeployed, - reconcilertestingv1.WithTopicAddress(testTopicURI), - reconcilertestingv1.WithTopicSetDefaults, - ), - }}, - }, { - Name: "delete topic - policy CreateNoDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantEvents: nil, - WantStatusUpdates: nil, - }, { - Name: "delete topic - policy CreateDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), - reconcilertestingv1.WithTopicTopicID(topicName), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), - }, - Key: testNS + "/" + topicName, - WantEvents: nil, - WantStatusUpdates: nil, - }, { - Name: "fail to delete - policy CreateDelete", - Objects: []runtime.Object{ - reconcilertestingv1.NewTopic(topicName, testNS, - reconcilertestingv1.WithTopicUID(topicUID), - reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ - Project: testProject, - Topic: testTopicID, - Secret: &secret, - }), - reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), - reconcilertestingv1.WithTopicTopicID(topicName), - reconcilertestingv1.WithTopicDeleted, - reconcilertestingv1.WithTopicSetDefaults, - ), - newSink(), - newSecret(), + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "the status of publisher is false", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeFalseStatusPublisher("PublisherNotDeployed", "PublisherNotDeployed")), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherNotDeployed("PublisherNotDeployed", "PublisherNotDeployed"), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "the status of publisher is unknown", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeUnknownStatusPublisher("PublisherUnknown", "PublisherUnknown")), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherUnknown("PublisherUnknown", "PublisherUnknown"), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "topic successfully reconciles and is ready", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{ + ProvideResource("create", "services", makeReadyPublisher()), + }, + WantCreates: []runtime.Object{ + newPublisher(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherDeployed, + reconcilertestingv1.WithTopicAddress(testTopicURI), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "topic successfully reconciles and reuses existing publisher", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + makeReadyPublisher(), + NewService(topicName+"-topic", testNS, + WithServiceOwnerReferences(ownerReferences()), + WithServiceLabels(resources.GetLabels(controllerAgentName, topicName)), + WithServicePorts(servicePorts())), + }, + Key: testNS + "/" + topicName, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(testNS, topicName, resourceGroup), + }, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", topicName), + Eventf(corev1.EventTypeNormal, reconciledSuccessReason, `Topic reconciled: "%s/%s"`, testNS, topicName), + }, + WithReactors: []clientgotesting.ReactionFunc{}, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + // Updates + reconcilertestingv1.WithInitTopicConditions, + reconcilertestingv1.WithTopicReadyAndPublisherDeployed(testTopicID), + reconcilertestingv1.WithTopicPublisherDeployed, + reconcilertestingv1.WithTopicAddress(testTopicURI), + reconcilertestingv1.WithTopicSetDefaults, + ), + }}, + OtherTestData: map[string]interface{}{}, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "delete topic - policy CreateNoDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateNoDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - Key: testNS + "/" + topicName, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, deleteTopicFailed, "Failed to delete Pub/Sub topic: delete-topic-induced-error"), + }, + Key: testNS + "/" + topicName, + WantEvents: nil, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, { + Name: "delete topic - policy CreateDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - OtherTestData: map[string]interface{}{ - "topic": gpubsub.TestClientData{ - TopicData: gpubsub.TestTopicData{ - Exists: true, - DeleteErr: errors.New("delete-topic-induced-error"), - }, - }, + }, + Key: testNS + "/" + topicName, + WantEvents: nil, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + NoTopicsExist(), + }, + }, { + Name: "fail to delete - policy CreateDelete", + Objects: []runtime.Object{ + reconcilertestingv1.NewTopic(topicName, testNS, + reconcilertestingv1.WithTopicUID(topicUID), + reconcilertestingv1.WithTopicSpec(pubsubv1.TopicSpec{ + Project: testProject, + Topic: testTopicID, + Secret: &secret, + }), + reconcilertestingv1.WithTopicPropagationPolicy("CreateDelete"), + reconcilertestingv1.WithTopicTopicID(testTopicID), + reconcilertestingv1.WithTopicProjectID(testProject), + reconcilertestingv1.WithTopicDeleted, + reconcilertestingv1.WithTopicSetDefaults, + ), + newSink(), + newSecret(), + }, + Key: testNS + "/" + topicName, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, deleteTopicFailed, "Failed to delete Pub/Sub topic: rpc error: code = Unknown desc = Injected error"), + }, + OtherTestData: map[string]interface{}{ + "pre": []PubsubAction{ + Topic(testTopicID), }, - WantStatusUpdates: nil, - }} + "server-options": []pstest.ServerReactorOption{pstest.WithErrorInjection("DeleteTopic", codes.Unknown, "Injected error")}, + }, + WantStatusUpdates: nil, + PostConditions: []func(*testing.T, *TableRow){ + TopicExists(testTopicID), + }, + }, + } table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { + // Insert pubsub client for PostConditions and create fixtures + opts := []pstest.ServerReactorOption{} + if testData != nil { + if testData["server-options"] != nil { + opts = testData["server-options"].([]pstest.ServerReactorOption) + } + } + + srv := pstest.NewServer(opts...) + + psclient, _ := GetTestClientCreateFunc(srv.Addr)(ctx, testProject) + conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + close := func() { + srv.Close() + conn.Close() + } + t.Cleanup(close) + if testData != nil { + InjectPubsubClient(testData, psclient) + if testData["pre"] != nil { + fixtures := testData["pre"].([]PubsubAction) + for _, f := range fixtures { + f(ctx, t, psclient) + } + } + } + // use normal create function or always error one + var createClientFn reconcilerutilspubsub.CreateFn + if testData != nil && testData["client-error"] != nil { + createClientFn = func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + return nil, fmt.Errorf(testData["client-error"].(string)) + } + } else { + createClientFn = GetTestClientCreateFunc(srv.Addr) + } pubsubBase := &intevents.PubSubBase{ Base: reconciler.NewBase(ctx, controllerAgentName, cmw), } @@ -668,7 +755,7 @@ func TestAllCases(t *testing.T) { topicLister: listers.GetTopicLister(), serviceLister: listers.GetV1ServiceLister(), publisherImage: testImage, - createClientFn: gpubsub.TestClientCreator(testData["topic"]), + createClientFn: createClientFn, } return topic.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetTopicLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/testing/pstest.go b/pkg/reconciler/testing/pstest.go index fea84c53db..82400d70ef 100644 --- a/pkg/reconciler/testing/pstest.go +++ b/pkg/reconciler/testing/pstest.go @@ -197,19 +197,34 @@ func getPubsubClient(r *rtesting.TableRow) *pubsub.Client { return r.OtherTestData["_psclient"].(*pubsub.Client) } -func TestPubsubClient(ctx context.Context, projectID string) (*pubsub.Client, func()) { - srv := pstest.NewServer() - conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) - if err != nil { - panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) - } +func TestPubsubClient(ctx context.Context, projectID string, opts ...pstest.ServerReactorOption) (*pubsub.Client, func()) { + srv := pstest.NewServer(opts...) + client, _ := GetTestClientCreateFunc(srv.Addr)(ctx, projectID) close := func() { srv.Close() - conn.Close() + client.Close() } - c, err := pubsub.NewClient(context.Background(), projectID, option.WithGRPCConn(conn)) - if err != nil { - panic(fmt.Errorf("failed to create test pubsub client: %v", err)) + return client, close +} + +// GetTestClientCreateFunc returns a client creation function with same type as pubsub.NewClient. With +// this helper function, multiple clients can be created. This is necessary for any test involving +// mulitple projects. Eg. in sources multiple project is allowed for topics. +func GetTestClientCreateFunc(target string) func(context.Context, string, ...option.ClientOption) (*pubsub.Client, error) { + return func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) { + newConn, err := grpc.Dial(target, grpc.WithInsecure()) + if err != nil { + panic(fmt.Errorf("failed to dial test pubsub connection: %v", err)) + } + // Connection cleanup + go func() { + <-ctx.Done() + newConn.Close() + }() + c, err := pubsub.NewClient(context.Background(), projectID, option.WithGRPCConn(newConn)) + if err != nil { + panic(fmt.Errorf("failed to create test pubsub client: %v", err)) + } + return c, nil } - return c, close } diff --git a/pkg/reconciler/utils/pubsub/types.go b/pkg/reconciler/utils/pubsub/types.go index e2455961bb..d5c19e1b01 100644 --- a/pkg/reconciler/utils/pubsub/types.go +++ b/pkg/reconciler/utils/pubsub/types.go @@ -17,10 +17,17 @@ limitations under the License. package pubsub import ( + "context" + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" "k8s.io/client-go/tools/record" ) +// CreateFn is a factory function to create a Pub/Sub client. It is copied from gclient/pubsub +// to avoid unnecessary dependency on gpubsub. +type CreateFn func(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) + type Reconciler struct { client *pubsub.Client recorder record.EventRecorder