From 281023d7a450811aa4d179f185159638f1cfb894 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 22 Jul 2021 10:25:08 -0700 Subject: [PATCH 01/10] feat(pubsub): add message retention duration feature --- pubsub/go.mod | 2 +- pubsub/go.sum | 2 ++ pubsub/topic.go | 6 ++++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index dc9e8b1825e..222b565d1f7 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -12,7 +12,7 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 google.golang.org/api v0.50.0 - google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 + google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba google.golang.org/grpc v1.39.0 google.golang.org/protobuf v1.27.1 ) diff --git a/pubsub/go.sum b/pubsub/go.sum index 1eae2791795..ed2fcbbf70b 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -468,6 +468,8 @@ google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+n google.golang.org/genproto v0.0.0-20210713002101-d411969a0d9a/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k= google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492 h1:7yQQsvnwjfEahbNNEKcBHv3mR+HnB1ctGY/z1JXzx8M= google.golang.org/genproto v0.0.0-20210719143636-1d5a45f8e492/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba h1:JT749mkw9Yco9FGCU9YV0I8QTHj1zzJjXbOU9ae8hXw= +google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/pubsub/topic.go b/pubsub/topic.go index c1bd00cb29c..3edf4a03066 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -100,6 +100,10 @@ type PublishSettings struct { // // Defaults to DefaultPublishSettings.BufferedByteLimit. BufferedByteLimit int + + // MessageRetentinDuration indicates the minimum duration to retain a message after + // it is published to the topic. + MessageRetentionDuration time.Duration } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -214,6 +218,8 @@ type TopicConfigToUpdate struct { // This field has beta status. It is not subject to the stability guarantee // and may change. MessageStoragePolicy *MessageStoragePolicy + + MessageRetentionDuration time.Duration } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { From 4d7b13d7f6aab9f22b8d7272f635004b4461b7a5 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 29 Jul 2021 22:11:35 -0700 Subject: [PATCH 02/10] feat(pubsub): add topic message retention duration --- pubsub/go.mod | 2 +- pubsub/go.sum | 2 ++ pubsub/integration_test.go | 60 ++++++++++++++++++++++++++++++++++++++ pubsub/subscription.go | 58 +++++++++++++++++++++--------------- pubsub/topic.go | 37 +++++++++++++++++++---- pubsub/topic_test.go | 1 + 6 files changed, 129 insertions(+), 31 deletions(-) diff --git a/pubsub/go.mod b/pubsub/go.mod index 74e404d52d2..529ee0d3941 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -12,7 +12,7 @@ require ( golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac google.golang.org/api v0.51.0 - google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba + google.golang.org/genproto v0.0.0-20210726221107-1c22fa41b0c9 google.golang.org/grpc v1.39.0 google.golang.org/protobuf v1.27.1 ) diff --git a/pubsub/go.sum b/pubsub/go.sum index 604ea17705b..14364784140 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -474,6 +474,8 @@ google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm google.golang.org/genproto v0.0.0-20210721163202-f1cecdd8b78a/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba h1:JT749mkw9Yco9FGCU9YV0I8QTHj1zzJjXbOU9ae8hXw= google.golang.org/genproto v0.0.0-20210722171250-098f7123d4ba/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/genproto v0.0.0-20210726221107-1c22fa41b0c9 h1:NUxd/tWRFFonSdqRCXuCM/lCg+jMwe8TQNL9h6NyDvA= +google.golang.org/genproto v0.0.0-20210726221107-1c22fa41b0c9/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index ece13438e08..d40a2e169cd 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1980,3 +1980,63 @@ func TestIntegration_ValidateMessage(t *testing.T) { }) } } + +func TestIntegration_TopicRetention(t *testing.T) { + ctx := context.Background() + opts := withGRPCHeadersAssertion(t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) + c := integrationTestClient(ctx, t, opts...) + defer c.Close() + + tc := TopicConfig{ + RetentionDuration: 50 * time.Minute, + } + topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc) + if err != nil { + t.Fatal(err) + } + + cfg, err := topic.Config(ctx) + if err != nil { + t.Fatal(err) + } + if !testutil.Equal(cfg, tc) { + t.Fatalf("got: %v, want %v", cfg, tc) + } + + newDur := 11 * time.Minute + cfg, err = topic.Update(ctx, TopicConfigToUpdate{ + RetentionDuration: newDur, + }) + if err != nil { + t.Fatal(err) + } + if got := cfg.RetentionDuration; got != newDur { + t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur) + } + + // Create a subscription on the topic and read TopicMessageRetentionDuration. + s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatal(err) + } + sCfg, err := s.Config(ctx) + if err != nil { + t.Fatal(err) + } + if got := sCfg.TopicMessageRetentionDuration; got != newDur { + t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur) + } + + // Clear retention duration by setting to a negative value. + cfg, err = topic.Update(ctx, TopicConfigToUpdate{ + RetentionDuration: -1 * time.Minute, + }) + if err != nil { + t.Fatal(err) + } + if got := cfg.RetentionDuration; got != 0 { + t.Fatalf("expected cleared retention duration, got: %v", got) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 5eca508ee23..1e5e9456a72 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -257,6 +257,14 @@ type SubscriptionConfig struct { // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to // the endpoint will not be made. Detached bool + + // TopicMessageRetentionDuration indicates the minimum duration for which a message is retained + // after it is published to the subscription's topic. If this field is set, + // messages published to the subscription's topic in the last + // `TopicMessageRetentionDuration` are always available to subscribers. + // + // This field is set only in responses from the server and otherwise ignored. + TopicMessageRetentionDuration time.Duration } func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { @@ -277,19 +285,20 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { pbRetryPolicy = cfg.RetryPolicy.toProto() } return &pb.Subscription{ - Name: name, - Topic: cfg.Topic.name, - PushConfig: pbPushConfig, - AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), - RetainAckedMessages: cfg.RetainAckedMessages, - MessageRetentionDuration: retentionDuration, - Labels: cfg.Labels, - ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), - EnableMessageOrdering: cfg.EnableMessageOrdering, - DeadLetterPolicy: pbDeadLetter, - Filter: cfg.Filter, - RetryPolicy: pbRetryPolicy, - Detached: cfg.Detached, + Name: name, + Topic: cfg.Topic.name, + PushConfig: pbPushConfig, + AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), + RetainAckedMessages: cfg.RetainAckedMessages, + MessageRetentionDuration: retentionDuration, + Labels: cfg.Labels, + ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), + EnableMessageOrdering: cfg.EnableMessageOrdering, + DeadLetterPolicy: pbDeadLetter, + Filter: cfg.Filter, + RetryPolicy: pbRetryPolicy, + Detached: cfg.Detached, + TopicMessageRetentionDuration: durpb.New(cfg.TopicMessageRetentionDuration), } } @@ -305,17 +314,18 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC dlp := protoToDLP(pbSub.DeadLetterPolicy) rp := protoToRetryPolicy(pbSub.RetryPolicy) subC := SubscriptionConfig{ - Topic: newTopic(c, pbSub.Topic), - AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), - RetainAckedMessages: pbSub.RetainAckedMessages, - RetentionDuration: rd, - Labels: pbSub.Labels, - ExpirationPolicy: expirationPolicy, - EnableMessageOrdering: pbSub.EnableMessageOrdering, - DeadLetterPolicy: dlp, - Filter: pbSub.Filter, - RetryPolicy: rp, - Detached: pbSub.Detached, + Topic: newTopic(c, pbSub.Topic), + AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), + RetainAckedMessages: pbSub.RetainAckedMessages, + RetentionDuration: rd, + Labels: pbSub.Labels, + ExpirationPolicy: expirationPolicy, + EnableMessageOrdering: pbSub.EnableMessageOrdering, + DeadLetterPolicy: dlp, + Filter: pbSub.Filter, + RetryPolicy: rp, + Detached: pbSub.Detached, + TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), } pc := protoToPushConfig(pbSub.PushConfig) if pc != nil { diff --git a/pubsub/topic.go b/pubsub/topic.go index f5a7467b86d..9ea7d5f61ce 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" ) const ( @@ -148,11 +149,12 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) { t := c.Topic(topicID) _, err := c.pubc.CreateTopic(ctx, &pb.Topic{ - Name: t.name, - Labels: tc.Labels, - MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), - KmsKeyName: tc.KMSKeyName, - SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), + Name: t.name, + Labels: tc.Labels, + MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), + KmsKeyName: tc.KMSKeyName, + SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), + MessageRetentionDuration: durationpb.New(tc.RetentionDuration), }) if err != nil { return nil, err @@ -204,6 +206,16 @@ type TopicConfig struct { // Schema defines the schema settings upon topic creation. This cannot // be modified after a topic has been created. SchemaSettings *SchemaSettings + + // RetentionDuration configures the minimum duration to retain a message after it is published to + // the topic. If this field is set, messages published to the topic in the + // last `message_retention_duration` are always available to subscribers. For + // instance, it allows any attached subscription to [seek to a + // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + // that is up to `message_retention_duration` in the past. If this field is + // not set, message retention is controlled by settings on individual + // subscriptions. Cannot be more than 7 days or less than 10 minutes. + RetentionDuration time.Duration } // TopicConfigToUpdate describes how to update a topic. @@ -224,7 +236,10 @@ type TopicConfigToUpdate struct { // and may change. MessageStoragePolicy *MessageStoragePolicy - MessageRetentionDuration time.Duration + // If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed. + // + // If set to a negative value, this clears RetentionDuration from the topic. + RetentionDuration time.Duration } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { @@ -233,6 +248,7 @@ func protoToTopicConfig(pbt *pb.Topic) TopicConfig { MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), KMSKeyName: pbt.KmsKeyName, SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), + RetentionDuration: pbt.MessageRetentionDuration.AsDuration(), } } @@ -322,6 +338,15 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy) paths = append(paths, "message_storage_policy") } + if cfg.RetentionDuration != 0 { + if cfg.RetentionDuration > 0 { + pt.MessageRetentionDuration = durationpb.New(cfg.RetentionDuration) + } else { + // If set to a negative value, clear MessageRetentionDuration. + pt.MessageRetentionDuration = nil + } + paths = append(paths, "message_retention_duration") + } return &pb.UpdateTopicRequest{ Topic: pt, UpdateMask: &fmpb.FieldMask{Paths: paths}, diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index d5d7299aa95..002bb3db786 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -88,6 +88,7 @@ func TestCreateTopicWithConfig(t *testing.T) { Schema: "projects/P/schemas/S", Encoding: EncodingJSON, }, + RetentionDuration: 5 * time.Hour, } topic := mustCreateTopicWithConfig(t, c, id, &want) From 67f7c5524fc909964f548ce148db67a2ab14b689 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 29 Jul 2021 22:18:24 -0700 Subject: [PATCH 03/10] comment fixes --- pubsub/subscription.go | 27 +++++++++++++-------------- pubsub/topic.go | 3 +-- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 1e5e9456a72..078c8ad9ae1 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -285,20 +285,19 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { pbRetryPolicy = cfg.RetryPolicy.toProto() } return &pb.Subscription{ - Name: name, - Topic: cfg.Topic.name, - PushConfig: pbPushConfig, - AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), - RetainAckedMessages: cfg.RetainAckedMessages, - MessageRetentionDuration: retentionDuration, - Labels: cfg.Labels, - ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), - EnableMessageOrdering: cfg.EnableMessageOrdering, - DeadLetterPolicy: pbDeadLetter, - Filter: cfg.Filter, - RetryPolicy: pbRetryPolicy, - Detached: cfg.Detached, - TopicMessageRetentionDuration: durpb.New(cfg.TopicMessageRetentionDuration), + Name: name, + Topic: cfg.Topic.name, + PushConfig: pbPushConfig, + AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), + RetainAckedMessages: cfg.RetainAckedMessages, + MessageRetentionDuration: retentionDuration, + Labels: cfg.Labels, + ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), + EnableMessageOrdering: cfg.EnableMessageOrdering, + DeadLetterPolicy: pbDeadLetter, + Filter: cfg.Filter, + RetryPolicy: pbRetryPolicy, + Detached: cfg.Detached, } } diff --git a/pubsub/topic.go b/pubsub/topic.go index 9ea7d5f61ce..5bf219ae15b 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -102,7 +102,7 @@ type PublishSettings struct { // Defaults to DefaultPublishSettings.BufferedByteLimit. BufferedByteLimit int - // MessageRetentinDuration indicates the minimum duration to retain a message after + // MessageRetentionDuration indicates the minimum duration to retain a message after // it is published to the topic. MessageRetentionDuration time.Duration } @@ -237,7 +237,6 @@ type TopicConfigToUpdate struct { MessageStoragePolicy *MessageStoragePolicy // If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed. - // // If set to a negative value, this clears RetentionDuration from the topic. RetentionDuration time.Duration } From a574ca2e69dc58e4396a858390d4c72fda77c161 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 20 Aug 2021 13:17:35 -0700 Subject: [PATCH 04/10] switch to optional.Duration --- pubsub/integration_test.go | 2 +- pubsub/topic.go | 48 +++++++++++++++++++++++++------------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 07503834187..89bdf56edac 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -2035,7 +2035,7 @@ func TestIntegration_TopicRetention(t *testing.T) { if err != nil { t.Fatal(err) } - if got := cfg.RetentionDuration; got != 0 { + if got := cfg.RetentionDuration; got != nil { t.Fatalf("expected cleared retention duration, got: %v", got) } } diff --git a/pubsub/topic.go b/pubsub/topic.go index 5bf219ae15b..c49bfc1c54e 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/iam" + "cloud.google.com/go/internal/optional" ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" @@ -148,14 +149,9 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error // If the topic already exists, an error will be returned. func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) { t := c.Topic(topicID) - _, err := c.pubc.CreateTopic(ctx, &pb.Topic{ - Name: t.name, - Labels: tc.Labels, - MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), - KmsKeyName: tc.KMSKeyName, - SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), - MessageRetentionDuration: durationpb.New(tc.RetentionDuration), - }) + topic := tc.toProto() + topic.Name = t.name + _, err := c.pubc.CreateTopic(ctx, topic) if err != nil { return nil, err } @@ -215,7 +211,22 @@ type TopicConfig struct { // that is up to `message_retention_duration` in the past. If this field is // not set, message retention is controlled by settings on individual // subscriptions. Cannot be more than 7 days or less than 10 minutes. - RetentionDuration time.Duration + RetentionDuration optional.Duration +} + +func (tc *TopicConfig) toProto() *pb.Topic { + var retDur *durationpb.Duration + if tc.RetentionDuration != nil { + retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration)) + } + pbt := &pb.Topic{ + Labels: tc.Labels, + MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), + KmsKeyName: tc.KMSKeyName, + SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), + MessageRetentionDuration: retDur, + } + return pbt } // TopicConfigToUpdate describes how to update a topic. @@ -238,17 +249,21 @@ type TopicConfigToUpdate struct { // If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed. // If set to a negative value, this clears RetentionDuration from the topic. - RetentionDuration time.Duration + // If nil, the retention duration remains unchanged. + RetentionDuration optional.Duration } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { - return TopicConfig{ + tc := TopicConfig{ Labels: pbt.Labels, MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), KMSKeyName: pbt.KmsKeyName, SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), - RetentionDuration: pbt.MessageRetentionDuration.AsDuration(), } + if pbt.MessageRetentionDuration != nil { + tc.RetentionDuration = pbt.MessageRetentionDuration.AsDuration() + } + return tc } // DetachSubscriptionResult is the response for the DetachSubscription method. @@ -337,11 +352,12 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy) paths = append(paths, "message_storage_policy") } - if cfg.RetentionDuration != 0 { - if cfg.RetentionDuration > 0 { - pt.MessageRetentionDuration = durationpb.New(cfg.RetentionDuration) + if cfg.RetentionDuration != nil { + r := optional.ToDuration(cfg.RetentionDuration) + if r > 0 { + pt.MessageRetentionDuration = durationpb.New(r) } else { - // If set to a negative value, clear MessageRetentionDuration. + // Clear MessageRetentionDuration. pt.MessageRetentionDuration = nil } paths = append(paths, "message_retention_duration") From f0635a2b37b7c2a11a37b6b569bceb60a186d226 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 11:09:14 -0700 Subject: [PATCH 05/10] remove staging endpoint --- pubsub/integration_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 89bdf56edac..217cbe12c3d 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1982,8 +1982,7 @@ func TestIntegration_ValidateMessage(t *testing.T) { func TestIntegration_TopicRetention(t *testing.T) { ctx := context.Background() - opts := withGRPCHeadersAssertion(t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) - c := integrationTestClient(ctx, t, opts...) + c := integrationTestClient(ctx, t) defer c.Close() tc := TopicConfig{ From cbd6456b6c7989dfde35fee05ab29860cb8931a7 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 11:30:34 -0700 Subject: [PATCH 06/10] address review comments --- pubsub/subscription.go | 10 +++++++--- pubsub/topic.go | 5 ++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index beb693b6ab4..8ce62e0e8a5 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -257,10 +257,14 @@ type SubscriptionConfig struct { // the endpoint will not be made. Detached bool - // TopicMessageRetentionDuration indicates the minimum duration for which a message is retained - // after it is published to the subscription's topic. If this field is set, - // messages published to the subscription's topic in the last + // TopicMessageRetentionDuration is the subscription's topic's message + // retention duration populated by the server and is read only.bundledMessage + // Topic message retention indicates the minimum duration for which a message is + // retained after it is published to the subscription's topic. If this field is + // set, messages published to the subscription's topic in the last // `TopicMessageRetentionDuration` are always available to subscribers. + // You can enable both topic and subscription retention for the same topic. + // In this situation, the maximum of the retention durations takes effect. // // This field is set only in responses from the server and otherwise ignored. TopicMessageRetentionDuration time.Duration diff --git a/pubsub/topic.go b/pubsub/topic.go index c49bfc1c54e..43fcd78eb45 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -354,9 +354,8 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { } if cfg.RetentionDuration != nil { r := optional.ToDuration(cfg.RetentionDuration) - if r > 0 { - pt.MessageRetentionDuration = durationpb.New(r) - } else { + pt.MessageRetentionDuration = durationpb.New(r) + if r <= 0 { // Clear MessageRetentionDuration. pt.MessageRetentionDuration = nil } From fe71f8b3644b85db7279ab65da1b25c61c13602e Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 11:36:13 -0700 Subject: [PATCH 07/10] remove unused instance of message retention duration in publishsettings --- pubsub/topic.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index 43fcd78eb45..c552841323e 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -102,10 +102,6 @@ type PublishSettings struct { // // Defaults to DefaultPublishSettings.BufferedByteLimit. BufferedByteLimit int - - // MessageRetentionDuration indicates the minimum duration to retain a message after - // it is published to the topic. - MessageRetentionDuration time.Duration } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -203,14 +199,16 @@ type TopicConfig struct { // be modified after a topic has been created. SchemaSettings *SchemaSettings - // RetentionDuration configures the minimum duration to retain a message after it is published to - // the topic. If this field is set, messages published to the topic in the - // last `message_retention_duration` are always available to subscribers. For - // instance, it allows any attached subscription to [seek to a + // RetentionDuration configures the minimum duration to retain a message + // after it is published to the topic. If this field is set, messages published + // to the topic in the last `message_retention_duration` are always available to subscribers. + // For instance, it allows any attached subscription to [seek to a // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) // that is up to `message_retention_duration` in the past. If this field is // not set, message retention is controlled by settings on individual // subscriptions. Cannot be more than 7 days or less than 10 minutes. + // + // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_retention. RetentionDuration optional.Duration } From 7808596b67a291684b21f1443e679ab4fa169a4e Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 12:06:38 -0700 Subject: [PATCH 08/10] revert sentinel value check --- pubsub/topic.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index c552841323e..cdc61a27852 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -258,8 +258,8 @@ func protoToTopicConfig(pbt *pb.Topic) TopicConfig { KMSKeyName: pbt.KmsKeyName, SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), } - if pbt.MessageRetentionDuration != nil { - tc.RetentionDuration = pbt.MessageRetentionDuration.AsDuration() + if pbt.GetMessageRetentionDuration() != nil { + tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration() } return tc } @@ -353,8 +353,8 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { if cfg.RetentionDuration != nil { r := optional.ToDuration(cfg.RetentionDuration) pt.MessageRetentionDuration = durationpb.New(r) - if r <= 0 { - // Clear MessageRetentionDuration. + if r < 0 { + // Clear MessageRetentionDuration if sentinel value is read. pt.MessageRetentionDuration = nil } paths = append(paths, "message_retention_duration") From 0728715f11777d54e01b80f927ad77f256a6ddaa Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 12:08:22 -0700 Subject: [PATCH 09/10] clarify doc comments --- pubsub/subscription.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 8ce62e0e8a5..4c788bc7882 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -257,16 +257,15 @@ type SubscriptionConfig struct { // the endpoint will not be made. Detached bool - // TopicMessageRetentionDuration is the subscription's topic's message - // retention duration populated by the server and is read only.bundledMessage - // Topic message retention indicates the minimum duration for which a message is + // TopicMessageRetentionDuration indicates the minimum duration for which a message is // retained after it is published to the subscription's topic. If this field is // set, messages published to the subscription's topic in the last // `TopicMessageRetentionDuration` are always available to subscribers. // You can enable both topic and subscription retention for the same topic. // In this situation, the maximum of the retention durations takes effect. // - // This field is set only in responses from the server and otherwise ignored. + // This is an output only field, meaning it will only appear in responses from the backend + // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration } From 92be15fe90565d687347ab12d75d27500b409602 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 24 Aug 2021 15:42:52 -0700 Subject: [PATCH 10/10] fix retention duration comments --- pubsub/topic.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index cdc61a27852..7fcc80031c1 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -201,14 +201,14 @@ type TopicConfig struct { // RetentionDuration configures the minimum duration to retain a message // after it is published to the topic. If this field is set, messages published - // to the topic in the last `message_retention_duration` are always available to subscribers. + // to the topic in the last `RetentionDuration` are always available to subscribers. // For instance, it allows any attached subscription to [seek to a // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) - // that is up to `message_retention_duration` in the past. If this field is + // that is up to `RetentionDuration` in the past. If this field is // not set, message retention is controlled by settings on individual // subscriptions. Cannot be more than 7 days or less than 10 minutes. // - // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_retention. + // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention. RetentionDuration optional.Duration }