Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add topic message retention duration #4520

Merged
merged 19 commits into from Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 59 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -1979,3 +1979,62 @@ func TestIntegration_ValidateMessage(t *testing.T) {
})
}
}

func TestIntegration_TopicRetention(t *testing.T) {
ctx := context.Background()
c := integrationTestClient(ctx, t)
defer c.Close()

tc := TopicConfig{
RetentionDuration: 50 * time.Minute,
}
topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
if err != nil {
t.Fatal(err)
}

cfg, err := topic.Config(ctx)
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(cfg, tc) {
t.Fatalf("got: %v, want %v", cfg, tc)
}

newDur := 11 * time.Minute
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: newDur,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != newDur {
t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
}

// Create a subscription on the topic and read TopicMessageRetentionDuration.
s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}
sCfg, err := s.Config(ctx)
if err != nil {
t.Fatal(err)
}
if got := sCfg.TopicMessageRetentionDuration; got != newDur {
t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
}

// Clear retention duration by setting to a negative value.
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: -1 * time.Minute,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != nil {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
}
34 changes: 23 additions & 11 deletions pubsub/subscription.go
Expand Up @@ -256,6 +256,17 @@ type SubscriptionConfig struct {
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached bool

// TopicMessageRetentionDuration indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This is an output only field, meaning it will only appear in responses from the backend
// and will be ignored if sent in a request.
TopicMessageRetentionDuration time.Duration
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Expand Down Expand Up @@ -304,17 +315,18 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
dlp := protoToDLP(pbSub.DeadLetterPolicy)
rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
Expand Down
59 changes: 51 additions & 8 deletions pubsub/topic.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
Expand All @@ -37,6 +38,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

const (
Expand Down Expand Up @@ -143,13 +145,9 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error
// If the topic already exists, an error will be returned.
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
t := c.Topic(topicID)
_, err := c.pubc.CreateTopic(ctx, &pb.Topic{
Name: t.name,
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
})
topic := tc.toProto()
topic.Name = t.name
_, err := c.pubc.CreateTopic(ctx, topic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,6 +198,33 @@ type TopicConfig struct {
// Schema defines the schema settings upon topic creation. This cannot
// be modified after a topic has been created.
SchemaSettings *SchemaSettings

// RetentionDuration configures the minimum duration to retain a message
// after it is published to the topic. If this field is set, messages published
// to the topic in the last `RetentionDuration` are always available to subscribers.
// For instance, it allows any attached subscription to [seek to a
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `RetentionDuration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 7 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration optional.Duration
}

func (tc *TopicConfig) toProto() *pb.Topic {
var retDur *durationpb.Duration
if tc.RetentionDuration != nil {
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
}
pbt := &pb.Topic{
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
}
return pbt
}

// TopicConfigToUpdate describes how to update a topic.
Expand All @@ -219,15 +244,24 @@ type TopicConfigToUpdate struct {
// This field has beta status. It is not subject to the stability guarantee
// and may change.
MessageStoragePolicy *MessageStoragePolicy

// If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration optional.Duration
}

func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
return TopicConfig{
tc := TopicConfig{
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
}
if pbt.GetMessageRetentionDuration() != nil {
tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
}
return tc
}

// DetachSubscriptionResult is the response for the DetachSubscription method.
Expand Down Expand Up @@ -316,6 +350,15 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
paths = append(paths, "message_storage_policy")
}
if cfg.RetentionDuration != nil {
r := optional.ToDuration(cfg.RetentionDuration)
pt.MessageRetentionDuration = durationpb.New(r)
if r < 0 {
// Clear MessageRetentionDuration if sentinel value is read.
pt.MessageRetentionDuration = nil
}
paths = append(paths, "message_retention_duration")
}
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
Expand Down
1 change: 1 addition & 0 deletions pubsub/topic_test.go
Expand Up @@ -88,6 +88,7 @@ func TestCreateTopicWithConfig(t *testing.T) {
Schema: "projects/P/schemas/S",
Encoding: EncodingJSON,
},
RetentionDuration: 5 * time.Hour,
}

topic := mustCreateTopicWithConfig(t, c, id, &want)
Expand Down