Skip to content

Commit

Permalink
feat(pubsublite): add sample to create a Pub/Sub Lite export subscrip…
Browse files Browse the repository at this point in the history
…tion (#2742)

* feat(pubsublite): add sample to create a Pub/Sub Lite export subscription

* Add seek target

* Update pubsublite version

* Update go.mod and go.sum
  • Loading branch information
tmdiep committed Dec 13, 2022
1 parent de76b97 commit 5a5776e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
66 changes: 66 additions & 0 deletions pubsublite/admin/create_pubsub_export_subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package admin

// [START pubsublite_create_pubsub_export_subscription]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/pubsublite"
)

func createPubsubExportSubscription(w io.Writer, projectID, region, location, topicID, subID, pubsubTopicID string) error {
// projectID := "my-project-id"
// region := "us-central1"
// NOTE: location can be either a region ("us-central1") or a zone ("us-central1-a")
// For a list of valid locations, see https://cloud.google.com/pubsub/lite/docs/locations.
// location := "us-central1"
// NOTE: topic and subscription must be in the same region/zone (e.g. "us-central1-a")
// topicID := "my-topic"
// subID := "my-subscription"
// pubsubTopicID := "destination-topic-id"
ctx := context.Background()
client, err := pubsublite.NewAdminClient(ctx, region)
if err != nil {
return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
}
defer client.Close()

// Initialize the subscription to the oldest retained messages for each
// partition.
targetLocation := pubsublite.AtTargetLocation(pubsublite.Beginning)

sub, err := client.CreateSubscription(ctx, pubsublite.SubscriptionConfig{
Name: fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, location, subID),
Topic: fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, location, topicID),
DeliveryRequirement: pubsublite.DeliverImmediately, // Can also be DeliverAfterStored.
// Configures an export subscription that writes messages to a Pub/Sub topic.
ExportConfig: &pubsublite.ExportConfig{
DesiredState: pubsublite.ExportActive, // Can also be ExportPaused.
Destination: &pubsublite.PubSubDestinationConfig{
Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, pubsubTopicID),
},
},
}, targetLocation)
if err != nil {
return fmt.Errorf("client.CreateSubscription got err: %v", err)
}
fmt.Fprintf(w, "Created export subscription: %s\n", sub.Name)
return nil
}

// [END pubsublite_create_pubsub_export_subscription]
32 changes: 31 additions & 1 deletion pubsublite/admin/pubsublite_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
"testing"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
"github.com/GoogleCloudPlatform/golang-samples/pubsublite/internal/psltest"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/api/cloudresourcemanager/v1"
cloudresourcemanager "google.golang.org/api/cloudresourcemanager/v1"
)

const (
Expand Down Expand Up @@ -202,6 +203,21 @@ func TestSubscriptionAdmin(t *testing.T) {
subID := resourcePrefix + uuid.NewString()
subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projNumber, testZone, subID)

exportSubID := resourcePrefix + "-export-" + uuid.NewString()
exportSubPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projNumber, testZone, exportSubID)

// Destination Pub/Sub topic for testing export subscriptions.
pubsubClient, err := pubsub.NewClient(ctx, tc.ProjectID)
if err != nil {
t.Fatalf("failed to create pubsub client: %v", err)
}
defer pubsubClient.Close()
pubsubTopic, err := pubsubClient.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("CreateTopic: %v", err)
}
defer pubsubTopic.Delete(ctx)

t.Run("CreateSubscription", func(t *testing.T) {
buf := new(bytes.Buffer)
err := createSubscription(buf, tc.ProjectID, testRegion, testZone, topicID, subID)
Expand All @@ -215,6 +231,19 @@ func TestSubscriptionAdmin(t *testing.T) {
}
})

t.Run("CreatePubsubExportSubscription", func(t *testing.T) {
buf := new(bytes.Buffer)
err := createPubsubExportSubscription(buf, tc.ProjectID, testRegion, testZone, topicID, exportSubID, topicID)
if err != nil {
t.Fatalf("createPubsubExportSubscription: %v", err)
}
got := buf.String()
want := fmt.Sprintf("Created export subscription: %s\n", exportSubPath)
if diff := cmp.Diff(want, got); diff != "" {
t.Fatalf("createPubsubExportSubscription() mismatch: -want, +got:\n%s", diff)
}
})

t.Run("GetSubscription", func(t *testing.T) {
testutil.Retry(t, 3, 5*time.Second, func(r *testutil.R) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -276,6 +305,7 @@ func TestSubscriptionAdmin(t *testing.T) {
}
})

client.DeleteSubscription(ctx, exportSubPath)
client.DeleteTopic(ctx, topicPath)
}

Expand Down
4 changes: 2 additions & 2 deletions pubsublite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
cloud.google.com/go/pubsub v1.28.0
cloud.google.com/go/pubsublite v1.5.0
cloud.google.com/go/pubsublite v1.6.0
github.com/GoogleCloudPlatform/golang-samples v0.0.0-20221212191713-33c1a699be11
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
Expand All @@ -13,7 +13,7 @@ require (
)

require (
cloud.google.com/go v0.105.0 // indirect
cloud.google.com/go v0.107.0 // indirect
cloud.google.com/go/compute v1.13.0 // indirect
cloud.google.com/go/compute/metadata v0.2.2 // indirect
cloud.google.com/go/iam v0.8.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions pubsublite/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y=
cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM=
cloud.google.com/go v0.107.0 h1:qkj22L7bgkl6vIeZDlOY2po43Mx/TIa2Wsa7VR+PEww=
cloud.google.com/go v0.107.0/go.mod h1:wpc2eNrD7hXUTy8EKS10jkxpZBjASrORK7goS+3YX2I=
cloud.google.com/go/compute v1.13.0 h1:AYrLkB8NPdDRslNp4Jxmzrhdr03fUAIDbiGFjLWowoU=
cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE=
cloud.google.com/go/compute/metadata v0.2.2 h1:aWKAjYaBaOSrpKl57+jnS/3fJRQnxL7TvR/u1VVbt6k=
Expand All @@ -12,8 +12,8 @@ cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+
cloud.google.com/go/longrunning v0.3.0/go.mod h1:qth9Y41RRSUE69rDcOn6DdK3HfQfsUI0YSmW3iIlLJc=
cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
cloud.google.com/go/pubsublite v1.5.0 h1:iqrD8vp3giTb7hI1q4TQQGj77cj8zzgmMPsTZtLnprM=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/pubsublite v1.6.0 h1:qh04RCSOnQDVHYmzT74ANu8WR9czAXG3Jl3TV4iR5no=
cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k=
cloud.google.com/go/storage v1.28.0 h1:DLrIZ6xkeZX6K70fU/boWx5INJumt6f+nwwWSHXzzGY=
cloud.google.com/go/storage v1.28.0/go.mod h1:qlgZML35PXA3zoEnIkiPLY4/TOkUleufRlu6qmcf7sI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down

0 comments on commit 5a5776e

Please sign in to comment.