Skip to content

Commit

Permalink
chore(storage): implement Notification methods (#6138)
Browse files Browse the repository at this point in the history
* chore(storage): implement ListNotifications

* add create, get, delete notification ops

* update ListNotifications implementation

* update listNotifications to use InternalFetch

* address comments

* review comments

* remove GetNotification and address comments

* update comments

* move tests
  • Loading branch information
cojenco committed Jun 14, 2022
1 parent 6a742ff commit 61dbbe6
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 0 deletions.
5 changes: 5 additions & 0 deletions storage/client.go
Expand Up @@ -101,6 +101,11 @@ type storageClient interface {
UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error)
CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error)
DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error

// Notification methods.
ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (map[string]*Notification, error)
CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (*Notification, error)
DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) error
}

// settings contains transport-agnostic configuration for API calls made via
Expand Down
80 changes: 80 additions & 0 deletions storage/client_test.go
Expand Up @@ -791,6 +791,86 @@ func TestOpenWriterEmulated(t *testing.T) {
})
}

func TestListNotificationsEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
_, err = client.CreateNotification(ctx, bucket, &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
})
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
n, err := client.ListNotifications(ctx, bucket)
if err != nil {
t.Fatalf("client.ListNotifications: %v", err)
}
if want, got := 1, len(n); want != got {
t.Errorf("ListNotifications: got %v, want %v items", n, want)
}
})
}

func TestCreateNotificationEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

want := &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
}
got, err := client.CreateNotification(ctx, bucket, want)
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
if diff := cmp.Diff(got.TopicID, want.TopicID); diff != "" {
t.Errorf("CreateNotification topic: got(-),want(+):\n%s", diff)
}
})
}

func TestDeleteNotificationEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
var n *Notification
n, err = client.CreateNotification(ctx, bucket, &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
})
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
err = client.DeleteNotification(ctx, bucket, n.ID)
if err != nil {
t.Fatalf("client.DeleteNotification: %v", err)
}
})
}

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }
if !isEmulatorEnvironmentSet() {
Expand Down
69 changes: 69 additions & 0 deletions storage/grpc_client.go
Expand Up @@ -899,6 +899,75 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
return errMethodNotSupported
}

// Notification methods.

func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
if s.userProject != "" {
ctx = setUserProjectMetadata(ctx, s.userProject)
}
req := &storagepb.ListNotificationsRequest{
Parent: bucketResourceName(globalProjectAlias, bucket),
}
var notifications []*storagepb.Notification
err = run(ctx, func() error {
gitr := c.raw.ListNotifications(ctx, req, s.gax...)
for {
// PageSize is not set and fallbacks to the API default pageSize of 100.
items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
if err != nil {
return err
}
notifications = append(notifications, items...)
// If there are no more results, nextPageToken is empty and err is nil.
if nextPageToken == "" {
return err
}
req.PageToken = nextPageToken
}
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
return nil, err
}

return notificationsToMapFromProto(notifications), nil
}

func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
req := &storagepb.CreateNotificationRequest{
Parent: bucketResourceName(globalProjectAlias, bucket),
Notification: toProtoNotification(n),
}
var pbn *storagepb.Notification
err = run(ctx, func() error {
var err error
pbn, err = c.raw.CreateNotification(ctx, req, s.gax...)
return err
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
return nil, err
}
return toNotificationFromProto(pbn), err
}

func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
req := &storagepb.DeleteNotificationRequest{Name: id}
return run(ctx, func() error {
return c.raw.DeleteNotification(ctx, req, s.gax...)
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
}

// setUserProjectMetadata appends a project ID to the outgoing Context metadata
// via the x-goog-user-project system parameter defined at
// https://cloud.google.com/apis/docs/system-parameters. This is only for
Expand Down
60 changes: 60 additions & 0 deletions storage/http_client.go
Expand Up @@ -1022,6 +1022,66 @@ func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
return errMethodNotSupported
}

// Notification methods.

// ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID.
//
// Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket,
// so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets.
func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
call := c.raw.Notifications.List(bucket)
if s.userProject != "" {
call.UserProject(s.userProject)
}
var res *raw.Notifications
err = run(ctx, func() error {
res, err = call.Context(ctx).Do()
return err
}, s.retry, true, setRetryHeaderHTTP(call))
if err != nil {
return nil, err
}
return notificationsToMap(res.Items), nil
}

func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
call := c.raw.Notifications.Insert(bucket, toRawNotification(n))
if s.userProject != "" {
call.UserProject(s.userProject)
}
var rn *raw.Notification
err = run(ctx, func() error {
rn, err = call.Context(ctx).Do()
return err
}, s.retry, s.idempotent, setRetryHeaderHTTP(call))
if err != nil {
return nil, err
}
return toNotification(rn), nil
}

func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
call := c.raw.Notifications.Delete(bucket, id)
if s.userProject != "" {
call.UserProject(s.userProject)
}
return run(ctx, func() error {
return call.Context(ctx).Do()
}, s.retry, s.idempotent, setRetryHeaderHTTP(call))
}

type httpReader struct {
body io.ReadCloser
seen int64
Expand Down
33 changes: 33 additions & 0 deletions storage/notifications.go
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/trace"
raw "google.golang.org/api/storage/v1"
storagepb "google.golang.org/genproto/googleapis/storage/v2"
)

// A Notification describes how to send Cloud PubSub messages when certain
Expand Down Expand Up @@ -91,6 +92,30 @@ func toNotification(rn *raw.Notification) *Notification {
return n
}

func toNotificationFromProto(pbn *storagepb.Notification) *Notification {
n := &Notification{
ID: pbn.GetName(),
EventTypes: pbn.GetEventTypes(),
ObjectNamePrefix: pbn.GetObjectNamePrefix(),
CustomAttributes: pbn.GetCustomAttributes(),
PayloadFormat: pbn.GetPayloadFormat(),
}
n.TopicProjectID, n.TopicID = parseNotificationTopic(pbn.Topic)
return n
}

func toProtoNotification(n *Notification) *storagepb.Notification {
return &storagepb.Notification{
Name: n.ID,
Topic: fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s",
n.TopicProjectID, n.TopicID),
EventTypes: n.EventTypes,
ObjectNamePrefix: n.ObjectNamePrefix,
CustomAttributes: n.CustomAttributes,
PayloadFormat: n.PayloadFormat,
}
}

var topicRE = regexp.MustCompile("^//pubsub.googleapis.com/projects/([^/]+)/topics/([^/]+)")

// parseNotificationTopic extracts the project and topic IDs from from the full
Expand Down Expand Up @@ -179,6 +204,14 @@ func notificationsToMap(rns []*raw.Notification) map[string]*Notification {
return m
}

func notificationsToMapFromProto(ns []*storagepb.Notification) map[string]*Notification {
m := map[string]*Notification{}
for _, n := range ns {
m[n.Name] = toNotificationFromProto(n)
}
return m
}

// DeleteNotification deletes the notification with the given ID.
func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.DeleteNotification")
Expand Down

0 comments on commit 61dbbe6

Please sign in to comment.