diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 99ac6518422..217cbe12c3d 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1979,3 +1979,62 @@ func TestIntegration_ValidateMessage(t *testing.T) { }) } } + +func TestIntegration_TopicRetention(t *testing.T) { + ctx := context.Background() + c := integrationTestClient(ctx, t) + defer c.Close() + + tc := TopicConfig{ + RetentionDuration: 50 * time.Minute, + } + topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc) + if err != nil { + t.Fatal(err) + } + + cfg, err := topic.Config(ctx) + if err != nil { + t.Fatal(err) + } + if !testutil.Equal(cfg, tc) { + t.Fatalf("got: %v, want %v", cfg, tc) + } + + newDur := 11 * time.Minute + cfg, err = topic.Update(ctx, TopicConfigToUpdate{ + RetentionDuration: newDur, + }) + if err != nil { + t.Fatal(err) + } + if got := cfg.RetentionDuration; got != newDur { + t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur) + } + + // Create a subscription on the topic and read TopicMessageRetentionDuration. + s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatal(err) + } + sCfg, err := s.Config(ctx) + if err != nil { + t.Fatal(err) + } + if got := sCfg.TopicMessageRetentionDuration; got != newDur { + t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur) + } + + // Clear retention duration by setting to a negative value. + cfg, err = topic.Update(ctx, TopicConfigToUpdate{ + RetentionDuration: -1 * time.Minute, + }) + if err != nil { + t.Fatal(err) + } + if got := cfg.RetentionDuration; got != nil { + t.Fatalf("expected cleared retention duration, got: %v", got) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 5fa080724a3..4c788bc7882 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -256,6 +256,17 @@ type SubscriptionConfig struct { // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to // the endpoint will not be made. Detached bool + + // TopicMessageRetentionDuration indicates the minimum duration for which a message is + // retained after it is published to the subscription's topic. If this field is + // set, messages published to the subscription's topic in the last + // `TopicMessageRetentionDuration` are always available to subscribers. + // You can enable both topic and subscription retention for the same topic. + // In this situation, the maximum of the retention durations takes effect. + // + // This is an output only field, meaning it will only appear in responses from the backend + // and will be ignored if sent in a request. + TopicMessageRetentionDuration time.Duration } func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { @@ -304,17 +315,18 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC dlp := protoToDLP(pbSub.DeadLetterPolicy) rp := protoToRetryPolicy(pbSub.RetryPolicy) subC := SubscriptionConfig{ - Topic: newTopic(c, pbSub.Topic), - AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), - RetainAckedMessages: pbSub.RetainAckedMessages, - RetentionDuration: rd, - Labels: pbSub.Labels, - ExpirationPolicy: expirationPolicy, - EnableMessageOrdering: pbSub.EnableMessageOrdering, - DeadLetterPolicy: dlp, - Filter: pbSub.Filter, - RetryPolicy: rp, - Detached: pbSub.Detached, + Topic: newTopic(c, pbSub.Topic), + AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), + RetainAckedMessages: pbSub.RetainAckedMessages, + RetentionDuration: rd, + Labels: pbSub.Labels, + ExpirationPolicy: expirationPolicy, + EnableMessageOrdering: pbSub.EnableMessageOrdering, + DeadLetterPolicy: dlp, + Filter: pbSub.Filter, + RetryPolicy: rp, + Detached: pbSub.Detached, + TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), } pc := protoToPushConfig(pbSub.PushConfig) if pc != nil { diff --git a/pubsub/topic.go b/pubsub/topic.go index c991617e303..7fcc80031c1 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -25,6 +25,7 @@ import ( "time" "cloud.google.com/go/iam" + "cloud.google.com/go/internal/optional" ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" @@ -37,6 +38,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" ) const ( @@ -143,13 +145,9 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error // If the topic already exists, an error will be returned. func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) { t := c.Topic(topicID) - _, err := c.pubc.CreateTopic(ctx, &pb.Topic{ - Name: t.name, - Labels: tc.Labels, - MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), - KmsKeyName: tc.KMSKeyName, - SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), - }) + topic := tc.toProto() + topic.Name = t.name + _, err := c.pubc.CreateTopic(ctx, topic) if err != nil { return nil, err } @@ -200,6 +198,33 @@ type TopicConfig struct { // Schema defines the schema settings upon topic creation. This cannot // be modified after a topic has been created. SchemaSettings *SchemaSettings + + // RetentionDuration configures the minimum duration to retain a message + // after it is published to the topic. If this field is set, messages published + // to the topic in the last `RetentionDuration` are always available to subscribers. + // For instance, it allows any attached subscription to [seek to a + // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + // that is up to `RetentionDuration` in the past. If this field is + // not set, message retention is controlled by settings on individual + // subscriptions. Cannot be more than 7 days or less than 10 minutes. + // + // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention. + RetentionDuration optional.Duration +} + +func (tc *TopicConfig) toProto() *pb.Topic { + var retDur *durationpb.Duration + if tc.RetentionDuration != nil { + retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration)) + } + pbt := &pb.Topic{ + Labels: tc.Labels, + MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), + KmsKeyName: tc.KMSKeyName, + SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), + MessageRetentionDuration: retDur, + } + return pbt } // TopicConfigToUpdate describes how to update a topic. @@ -219,15 +244,24 @@ type TopicConfigToUpdate struct { // This field has beta status. It is not subject to the stability guarantee // and may change. MessageStoragePolicy *MessageStoragePolicy + + // If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed. + // If set to a negative value, this clears RetentionDuration from the topic. + // If nil, the retention duration remains unchanged. + RetentionDuration optional.Duration } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { - return TopicConfig{ + tc := TopicConfig{ Labels: pbt.Labels, MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), KMSKeyName: pbt.KmsKeyName, SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), } + if pbt.GetMessageRetentionDuration() != nil { + tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration() + } + return tc } // DetachSubscriptionResult is the response for the DetachSubscription method. @@ -316,6 +350,15 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy) paths = append(paths, "message_storage_policy") } + if cfg.RetentionDuration != nil { + r := optional.ToDuration(cfg.RetentionDuration) + pt.MessageRetentionDuration = durationpb.New(r) + if r < 0 { + // Clear MessageRetentionDuration if sentinel value is read. + pt.MessageRetentionDuration = nil + } + paths = append(paths, "message_retention_duration") + } return &pb.UpdateTopicRequest{ Topic: pt, UpdateMask: &fmpb.FieldMask{Paths: paths}, diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index d5d7299aa95..002bb3db786 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -88,6 +88,7 @@ func TestCreateTopicWithConfig(t *testing.T) { Schema: "projects/P/schemas/S", Encoding: EncodingJSON, }, + RetentionDuration: 5 * time.Hour, } topic := mustCreateTopicWithConfig(t, c, id, &want)