Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Remove gpubsub in sources (#1761)
Browse files Browse the repository at this point in the history
* (feat/source): Remove gpubsub from Topic and PullSubscription

Topic and PullSubscription reconcilers still use the gpubsub client, making their tests
different from the Broker and Trigger tests which use pstest. With this, error injection
is done by setting server option to the pubsub testing server. The reconcilers will use
google-cloud-go pubsub directly.

* Fix static pullsubscription, no subscription check yet

* add subscription check to pullsubscription_test.go

* apply change to keda pullsubscription

* Use create function in topic

* use create function for pullsubscription

* remove helper as they are not used here

* Add connection cleanup goroutine, fix typo
  • Loading branch information
Zhongduo Lin (Jimmy) committed Oct 5, 2020
1 parent 46a02fa commit a4cf4ad
Show file tree
Hide file tree
Showing 10 changed files with 626 additions and 459 deletions.
4 changes: 2 additions & 2 deletions pkg/reconciler/intevents/pullsubscription/keda/controller.go
Expand Up @@ -21,6 +21,7 @@ import (

"knative.dev/pkg/injection"

"cloud.google.com/go/pubsub"
"go.uber.org/zap"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache"
Expand All @@ -31,7 +32,6 @@ import (
"github.com/google/knative-gcp/pkg/client/injection/ducks/duck/v1/resource"
pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription"
pullsubscriptionreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/intevents/v1/pullsubscription"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
"github.com/google/knative-gcp/pkg/reconciler"
"github.com/google/knative-gcp/pkg/reconciler/identity"
"github.com/google/knative-gcp/pkg/reconciler/identity/iam"
Expand Down Expand Up @@ -104,7 +104,7 @@ func newController(
DeploymentLister: deploymentInformer.Lister(),
PullSubscriptionLister: pullSubscriptionInformer.Lister(),
ReceiveAdapterImage: env.ReceiveAdapter,
CreateClientFn: gpubsub.NewClient,
CreateClientFn: pubsub.NewClient,
ControllerAgentName: controllerAgentName,
ResourceGroup: resourceGroup,
},
Expand Down
177 changes: 102 additions & 75 deletions pkg/reconciler/intevents/pullsubscription/keda/pullsubscription_test.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions pkg/reconciler/intevents/pullsubscription/reconciler.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"cloud.google.com/go/pubsub"
"go.uber.org/zap"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -44,10 +45,10 @@ import (

v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1"
listers "github.com/google/knative-gcp/pkg/client/listers/intevents/v1"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
"github.com/google/knative-gcp/pkg/reconciler/identity"
"github.com/google/knative-gcp/pkg/reconciler/intevents"
"github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/resources"
reconcilerutilspubsub "github.com/google/knative-gcp/pkg/reconciler/utils/pubsub"
"github.com/google/knative-gcp/pkg/tracing"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ type Base struct {

// CreateClientFn is the function used to create the Pub/Sub client that interacts with Pub/Sub.
// This is needed so that we can inject a mock client for UTs purposes.
CreateClientFn gpubsub.CreateFn
CreateClientFn reconcilerutilspubsub.CreateFn

// ReconcileDataPlaneFn is the function used to reconcile the data plane resources.
ReconcileDataPlaneFn ReconcileDataPlaneFunc
Expand Down Expand Up @@ -196,7 +197,7 @@ func (r *Base) reconcileSubscription(ctx context.Context, ps *v1.PullSubscriptio
}

// subConfig is the wanted config based on settings.
subConfig := gpubsub.SubscriptionConfig{
subConfig := pubsub.SubscriptionConfig{
Topic: t,
RetainAckedMessages: ps.Spec.RetainAckedMessages,
}
Expand Down
Expand Up @@ -21,11 +21,11 @@ import (

"knative.dev/pkg/injection"

"cloud.google.com/go/pubsub"
"github.com/google/knative-gcp/pkg/apis/configs/gcpauth"
"github.com/google/knative-gcp/pkg/apis/duck"
v1 "github.com/google/knative-gcp/pkg/apis/intevents/v1"
pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1/pullsubscription"
gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub"
"github.com/google/knative-gcp/pkg/reconciler"
"github.com/google/knative-gcp/pkg/reconciler/identity"
"github.com/google/knative-gcp/pkg/reconciler/identity/iam"
Expand Down Expand Up @@ -100,7 +100,7 @@ func newController(
DeploymentLister: deploymentInformer.Lister(),
PullSubscriptionLister: pullSubscriptionInformer.Lister(),
ReceiveAdapterImage: env.ReceiveAdapter,
CreateClientFn: gpubsub.NewClient,
CreateClientFn: pubsub.NewClient,
ControllerAgentName: controllerAgentName,
ResourceGroup: resourceGroup,
},
Expand Down

0 comments on commit a4cf4ad

Please sign in to comment.