Skip to content

Commit

Permalink
feat(pubsub): add topic message retention duration (googleapis#4520)
Browse files Browse the repository at this point in the history
* feat(pubsub): add message retention duration feature

* feat(pubsub): add topic message retention duration

* comment fixes

* switch to optional.Duration

* remove staging endpoint

* address review comments

* remove unused instance of message retention duration in publishsettings

* revert sentinel value check

* clarify doc comments

* fix retention duration comments
  • Loading branch information
hongalex authored and BrennaEpp committed Aug 27, 2021
1 parent 616082a commit 5875174
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 19 deletions.
59 changes: 59 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -1979,3 +1979,62 @@ func TestIntegration_ValidateMessage(t *testing.T) {
})
}
}

func TestIntegration_TopicRetention(t *testing.T) {
ctx := context.Background()
c := integrationTestClient(ctx, t)
defer c.Close()

tc := TopicConfig{
RetentionDuration: 50 * time.Minute,
}
topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
if err != nil {
t.Fatal(err)
}

cfg, err := topic.Config(ctx)
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(cfg, tc) {
t.Fatalf("got: %v, want %v", cfg, tc)
}

newDur := 11 * time.Minute
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: newDur,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != newDur {
t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
}

// Create a subscription on the topic and read TopicMessageRetentionDuration.
s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}
sCfg, err := s.Config(ctx)
if err != nil {
t.Fatal(err)
}
if got := sCfg.TopicMessageRetentionDuration; got != newDur {
t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
}

// Clear retention duration by setting to a negative value.
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: -1 * time.Minute,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != nil {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
}
34 changes: 23 additions & 11 deletions pubsub/subscription.go
Expand Up @@ -256,6 +256,17 @@ type SubscriptionConfig struct {
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached bool

// TopicMessageRetentionDuration indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration
}

func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Expand Down Expand Up @@ -304,17 +315,18 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
dlp := protoToDLP(pbSub.DeadLetterPolicy)
rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
Expand Down
59 changes: 51 additions & 8 deletions pubsub/topic.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
Expand All @@ -37,6 +38,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

const (
Expand Down Expand Up @@ -143,13 +145,9 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error
// If the topic already exists, an error will be returned.
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
t := c.Topic(topicID)
_, err := c.pubc.CreateTopic(ctx, &pb.Topic{
Name: t.name,
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
})
topic := tc.toProto()
topic.Name = t.name
_, err := c.pubc.CreateTopic(ctx, topic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,6 +198,33 @@ type TopicConfig struct {
// Schema defines the schema settings upon topic creation. This cannot
// be modified after a topic has been created.
SchemaSettings *SchemaSettings

// RetentionDuration configures the minimum duration to retain a message
// after it is published to the topic. If this field is set, messages published
// to the topic in the last `RetentionDuration` are always available to subscribers.
// For instance, it allows any attached subscription to [seek to a
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `RetentionDuration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 7 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration optional.Duration
}

func (tc *TopicConfig) toProto() *pb.Topic {
var retDur *durationpb.Duration
if tc.RetentionDuration != nil {
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
}
pbt := &pb.Topic{
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
}
return pbt
}

// TopicConfigToUpdate describes how to update a topic.
Expand All @@ -219,15 +244,24 @@ type TopicConfigToUpdate struct {
// This field has beta status. It is not subject to the stability guarantee
// and may change.
MessageStoragePolicy *MessageStoragePolicy

// If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration optional.Duration
}

func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
return TopicConfig{
tc := TopicConfig{
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
}
if pbt.GetMessageRetentionDuration() != nil {
tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
}
return tc
}

// DetachSubscriptionResult is the response for the DetachSubscription method.
Expand Down Expand Up @@ -316,6 +350,15 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
paths = append(paths, "message_storage_policy")
}
if cfg.RetentionDuration != nil {
r := optional.ToDuration(cfg.RetentionDuration)
pt.MessageRetentionDuration = durationpb.New(r)
if r < 0 {
// Clear MessageRetentionDuration if sentinel value is read.
pt.MessageRetentionDuration = nil
}
paths = append(paths, "message_retention_duration")
}
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
Expand Down
1 change: 1 addition & 0 deletions pubsub/topic_test.go
Expand Up @@ -88,6 +88,7 @@ func TestCreateTopicWithConfig(t *testing.T) {
Schema: "projects/P/schemas/S",
Encoding: EncodingJSON,
},
RetentionDuration: 5 * time.Hour,
}

topic := mustCreateTopicWithConfig(t, c, id, &want)
Expand Down

0 comments on commit 5875174

Please sign in to comment.