From 61dbbe667643aa22e7497e7b45c86fb365bffbcb Mon Sep 17 00:00:00 2001 From: cojenco Date: Tue, 14 Jun 2022 12:54:58 -0700 Subject: [PATCH] chore(storage): implement Notification methods (#6138) * chore(storage): implement ListNotifications * add create, get, delete notification ops * update ListNotifications implementation * update listNotifications to use InternalFetch * address comments * review comments * remove GetNotification and address comments * update comments * move tests --- storage/client.go | 5 +++ storage/client_test.go | 80 ++++++++++++++++++++++++++++++++++++++++ storage/grpc_client.go | 69 ++++++++++++++++++++++++++++++++++ storage/http_client.go | 60 ++++++++++++++++++++++++++++++ storage/notifications.go | 33 +++++++++++++++++ 5 files changed, 247 insertions(+) diff --git a/storage/client.go b/storage/client.go index 4a6ee15c50d..f26a52a765e 100644 --- a/storage/client.go +++ b/storage/client.go @@ -101,6 +101,11 @@ type storageClient interface { UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error + + // Notification methods. + ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (map[string]*Notification, error) + CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (*Notification, error) + DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) error } // settings contains transport-agnostic configuration for API calls made via diff --git a/storage/client_test.go b/storage/client_test.go index 3f8cb78bfc3..702808a85c7 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -791,6 +791,86 @@ func TestOpenWriterEmulated(t *testing.T) { }) } +func TestListNotificationsEmulated(t *testing.T) { + transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { + // Populate test object. + ctx := context.Background() + _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + Name: bucket, + }) + if err != nil { + t.Fatalf("client.CreateBucket: %v", err) + } + _, err = client.CreateNotification(ctx, bucket, &Notification{ + TopicProjectID: project, + TopicID: "go-storage-notification-test", + PayloadFormat: "JSON_API_V1", + }) + if err != nil { + t.Fatalf("client.CreateNotification: %v", err) + } + n, err := client.ListNotifications(ctx, bucket) + if err != nil { + t.Fatalf("client.ListNotifications: %v", err) + } + if want, got := 1, len(n); want != got { + t.Errorf("ListNotifications: got %v, want %v items", n, want) + } + }) +} + +func TestCreateNotificationEmulated(t *testing.T) { + transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { + // Populate test object. + ctx := context.Background() + _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + Name: bucket, + }) + if err != nil { + t.Fatalf("client.CreateBucket: %v", err) + } + + want := &Notification{ + TopicProjectID: project, + TopicID: "go-storage-notification-test", + PayloadFormat: "JSON_API_V1", + } + got, err := client.CreateNotification(ctx, bucket, want) + if err != nil { + t.Fatalf("client.CreateNotification: %v", err) + } + if diff := cmp.Diff(got.TopicID, want.TopicID); diff != "" { + t.Errorf("CreateNotification topic: got(-),want(+):\n%s", diff) + } + }) +} + +func TestDeleteNotificationEmulated(t *testing.T) { + transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { + // Populate test object. + ctx := context.Background() + _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + Name: bucket, + }) + if err != nil { + t.Fatalf("client.CreateBucket: %v", err) + } + var n *Notification + n, err = client.CreateNotification(ctx, bucket, &Notification{ + TopicProjectID: project, + TopicID: "go-storage-notification-test", + PayloadFormat: "JSON_API_V1", + }) + if err != nil { + t.Fatalf("client.CreateNotification: %v", err) + } + err = client.DeleteNotification(ctx, bucket, n.ID) + if err != nil { + t.Fatalf("client.DeleteNotification: %v", err) + } + }) +} + func initEmulatorClients() func() error { noopCloser := func() error { return nil } if !isEmulatorEnvironmentSet() { diff --git a/storage/grpc_client.go b/storage/grpc_client.go index dced706a460..cc246cf7247 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -899,6 +899,75 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc return errMethodNotSupported } +// Notification methods. + +func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + req := &storagepb.ListNotificationsRequest{ + Parent: bucketResourceName(globalProjectAlias, bucket), + } + var notifications []*storagepb.Notification + err = run(ctx, func() error { + gitr := c.raw.ListNotifications(ctx, req, s.gax...) + for { + // PageSize is not set and fallbacks to the API default pageSize of 100. + items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken()) + if err != nil { + return err + } + notifications = append(notifications, items...) + // If there are no more results, nextPageToken is empty and err is nil. + if nextPageToken == "" { + return err + } + req.PageToken = nextPageToken + } + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + if err != nil { + return nil, err + } + + return notificationsToMapFromProto(notifications), nil +} + +func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + req := &storagepb.CreateNotificationRequest{ + Parent: bucketResourceName(globalProjectAlias, bucket), + Notification: toProtoNotification(n), + } + var pbn *storagepb.Notification + err = run(ctx, func() error { + var err error + pbn, err = c.raw.CreateNotification(ctx, req, s.gax...) + return err + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) + if err != nil { + return nil, err + } + return toNotificationFromProto(pbn), err +} + +func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + req := &storagepb.DeleteNotificationRequest{Name: id} + return run(ctx, func() error { + return c.raw.DeleteNotification(ctx, req, s.gax...) + }, s.retry, s.idempotent, setRetryHeaderGRPC(ctx)) +} + // setUserProjectMetadata appends a project ID to the outgoing Context metadata // via the x-goog-user-project system parameter defined at // https://cloud.google.com/apis/docs/system-parameters. This is only for diff --git a/storage/http_client.go b/storage/http_client.go index 0932d556430..45c175f8988 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -1022,6 +1022,66 @@ func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc return errMethodNotSupported } +// Notification methods. + +// ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID. +// +// Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket, +// so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets. +func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + call := c.raw.Notifications.List(bucket) + if s.userProject != "" { + call.UserProject(s.userProject) + } + var res *raw.Notifications + err = run(ctx, func() error { + res, err = call.Context(ctx).Do() + return err + }, s.retry, true, setRetryHeaderHTTP(call)) + if err != nil { + return nil, err + } + return notificationsToMap(res.Items), nil +} + +func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + call := c.raw.Notifications.Insert(bucket, toRawNotification(n)) + if s.userProject != "" { + call.UserProject(s.userProject) + } + var rn *raw.Notification + err = run(ctx, func() error { + rn, err = call.Context(ctx).Do() + return err + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) + if err != nil { + return nil, err + } + return toNotification(rn), nil +} + +func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) { + ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification") + defer func() { trace.EndSpan(ctx, err) }() + + s := callSettings(c.settings, opts...) + call := c.raw.Notifications.Delete(bucket, id) + if s.userProject != "" { + call.UserProject(s.userProject) + } + return run(ctx, func() error { + return call.Context(ctx).Do() + }, s.retry, s.idempotent, setRetryHeaderHTTP(call)) +} + type httpReader struct { body io.ReadCloser seen int64 diff --git a/storage/notifications.go b/storage/notifications.go index dd43822b6a3..fe3e3ae069c 100644 --- a/storage/notifications.go +++ b/storage/notifications.go @@ -22,6 +22,7 @@ import ( "cloud.google.com/go/internal/trace" raw "google.golang.org/api/storage/v1" + storagepb "google.golang.org/genproto/googleapis/storage/v2" ) // A Notification describes how to send Cloud PubSub messages when certain @@ -91,6 +92,30 @@ func toNotification(rn *raw.Notification) *Notification { return n } +func toNotificationFromProto(pbn *storagepb.Notification) *Notification { + n := &Notification{ + ID: pbn.GetName(), + EventTypes: pbn.GetEventTypes(), + ObjectNamePrefix: pbn.GetObjectNamePrefix(), + CustomAttributes: pbn.GetCustomAttributes(), + PayloadFormat: pbn.GetPayloadFormat(), + } + n.TopicProjectID, n.TopicID = parseNotificationTopic(pbn.Topic) + return n +} + +func toProtoNotification(n *Notification) *storagepb.Notification { + return &storagepb.Notification{ + Name: n.ID, + Topic: fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s", + n.TopicProjectID, n.TopicID), + EventTypes: n.EventTypes, + ObjectNamePrefix: n.ObjectNamePrefix, + CustomAttributes: n.CustomAttributes, + PayloadFormat: n.PayloadFormat, + } +} + var topicRE = regexp.MustCompile("^//pubsub.googleapis.com/projects/([^/]+)/topics/([^/]+)") // parseNotificationTopic extracts the project and topic IDs from from the full @@ -179,6 +204,14 @@ func notificationsToMap(rns []*raw.Notification) map[string]*Notification { return m } +func notificationsToMapFromProto(ns []*storagepb.Notification) map[string]*Notification { + m := map[string]*Notification{} + for _, n := range ns { + m[n.Name] = toNotificationFromProto(n) + } + return m +} + // DeleteNotification deletes the notification with the given ID. func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.DeleteNotification")