Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add topic message retention duration #4520

Merged
merged 19 commits into from Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 59 additions & 0 deletions pubsub/integration_test.go
Expand Up @@ -1979,3 +1979,62 @@ func TestIntegration_ValidateMessage(t *testing.T) {
})
}
}

func TestIntegration_TopicRetention(t *testing.T) {
ctx := context.Background()
c := integrationTestClient(ctx, t)
defer c.Close()

tc := TopicConfig{
RetentionDuration: 50 * time.Minute,
}
topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
if err != nil {
t.Fatal(err)
}

cfg, err := topic.Config(ctx)
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(cfg, tc) {
t.Fatalf("got: %v, want %v", cfg, tc)
}

newDur := 11 * time.Minute
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: newDur,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != newDur {
t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
}

// Create a subscription on the topic and read TopicMessageRetentionDuration.
s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatal(err)
}
sCfg, err := s.Config(ctx)
if err != nil {
t.Fatal(err)
}
if got := sCfg.TopicMessageRetentionDuration; got != newDur {
t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
}

// Clear retention duration by setting to a negative value.
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: -1 * time.Minute,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != nil {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
}
35 changes: 24 additions & 11 deletions pubsub/subscription.go
Expand Up @@ -256,6 +256,18 @@ type SubscriptionConfig struct {
// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
// the endpoint will not be made.
Detached bool

// TopicMessageRetentionDuration is the subscription's topic's message
// retention duration populated by the server and is read only.bundledMessage
// Topic message retention indicates the minimum duration for which a message is
// retained after it is published to the subscription's topic. If this field is
// set, messages published to the subscription's topic in the last
// `TopicMessageRetentionDuration` are always available to subscribers.
// You can enable both topic and subscription retention for the same topic.
// In this situation, the maximum of the retention durations takes effect.
//
// This field is set only in responses from the server and otherwise ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear: this is an "output only" field? I know this isn't a proto field, but just want to clarify with https://google.aip.dev/203#output-only

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is an output only field, correspondig to https://github.com/googleapis/googleapis/blob/ba30d8097582039ac4cc4e21b4e4baa426423075/google/pubsub/v1/pubsub.proto#L761.

Should I explicitly add this to the comments here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I explicitly add this to the comments here?

It's fine the way it is, but maybe it would be clearer to say This is an output only field, meaning it will only appear in responses from the backend and will be ignored if sent in a request. You could include the link too if you feel it would be helpful.

No change is really needed, just wanted to clarify for myself.

TopicMessageRetentionDuration time.Duration
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Expand Down Expand Up @@ -304,17 +316,18 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
dlp := protoToDLP(pbSub.DeadLetterPolicy)
rp := protoToRetryPolicy(pbSub.RetryPolicy)
subC := SubscriptionConfig{
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
RetainAckedMessages: pbSub.RetainAckedMessages,
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
EnableMessageOrdering: pbSub.EnableMessageOrdering,
DeadLetterPolicy: dlp,
Filter: pbSub.Filter,
RetryPolicy: rp,
Detached: pbSub.Detached,
TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
Expand Down
59 changes: 51 additions & 8 deletions pubsub/topic.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
Expand All @@ -37,6 +38,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

const (
Expand Down Expand Up @@ -143,13 +145,9 @@ func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error
// If the topic already exists, an error will be returned.
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
t := c.Topic(topicID)
_, err := c.pubc.CreateTopic(ctx, &pb.Topic{
Name: t.name,
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
})
topic := tc.toProto()
topic.Name = t.name
_, err := c.pubc.CreateTopic(ctx, topic)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,6 +198,33 @@ type TopicConfig struct {
// Schema defines the schema settings upon topic creation. This cannot
// be modified after a topic has been created.
SchemaSettings *SchemaSettings

// RetentionDuration configures the minimum duration to retain a message
// after it is published to the topic. If this field is set, messages published
// to the topic in the last `message_retention_duration` are always available to subscribers.
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
// For instance, it allows any attached subscription to [seek to a
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `message_retention_duration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 7 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_retention.
lahuang4 marked this conversation as resolved.
Show resolved Hide resolved
RetentionDuration optional.Duration
}

func (tc *TopicConfig) toProto() *pb.Topic {
var retDur *durationpb.Duration
if tc.RetentionDuration != nil {
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
}
pbt := &pb.Topic{
Labels: tc.Labels,
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
KmsKeyName: tc.KMSKeyName,
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
MessageRetentionDuration: retDur,
}
return pbt
}

// TopicConfigToUpdate describes how to update a topic.
Expand All @@ -219,15 +244,24 @@ type TopicConfigToUpdate struct {
// This field has beta status. It is not subject to the stability guarantee
// and may change.
MessageStoragePolicy *MessageStoragePolicy

// If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed.
// If set to a negative value, this clears RetentionDuration from the topic.
// If nil, the retention duration remains unchanged.
RetentionDuration optional.Duration
}

func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
return TopicConfig{
tc := TopicConfig{
Labels: pbt.Labels,
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
KMSKeyName: pbt.KmsKeyName,
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
}
if pbt.MessageRetentionDuration != nil {
tc.RetentionDuration = pbt.MessageRetentionDuration.AsDuration()
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
}
return tc
}

// DetachSubscriptionResult is the response for the DetachSubscription method.
Expand Down Expand Up @@ -316,6 +350,15 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
paths = append(paths, "message_storage_policy")
}
if cfg.RetentionDuration != nil {
r := optional.ToDuration(cfg.RetentionDuration)
pt.MessageRetentionDuration = durationpb.New(r)
if r <= 0 {
noahdietz marked this conversation as resolved.
Show resolved Hide resolved
// Clear MessageRetentionDuration.
pt.MessageRetentionDuration = nil
}
paths = append(paths, "message_retention_duration")
}
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
Expand Down
1 change: 1 addition & 0 deletions pubsub/topic_test.go
Expand Up @@ -88,6 +88,7 @@ func TestCreateTopicWithConfig(t *testing.T) {
Schema: "projects/P/schemas/S",
Encoding: EncodingJSON,
},
RetentionDuration: 5 * time.Hour,
}

topic := mustCreateTopicWithConfig(t, c, id, &want)
Expand Down