diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 9804b369f92..f2939b81752 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -22,6 +22,7 @@ import ( vkit "cloud.google.com/go/pubsublite/apiv1" pb "cloud.google.com/go/pubsublite/apiv1/pubsublitepb" + fmpb "google.golang.org/genproto/protobuf/field_mask" ) var ( @@ -155,7 +156,7 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator } type createSubscriptionSettings struct { - backlogLocation BacklogLocation + target SeekTarget } // CreateSubscriptionOption is an option for AdminClient.CreateSubscription. @@ -163,27 +164,42 @@ type CreateSubscriptionOption interface { apply(*createSubscriptionSettings) } -type startingOffset struct { - backlogLocation BacklogLocation +type targetLocation struct { + target SeekTarget } -func (so startingOffset) apply(settings *createSubscriptionSettings) { - settings.backlogLocation = so.backlogLocation +func (tl targetLocation) apply(settings *createSubscriptionSettings) { + settings.target = tl.target } // StartingOffset specifies the offset at which a newly created subscription // will start receiving messages. +// +// Deprecated. This is equivalent to calling AtTargetLocation with a +// BacklogLocation and will be removed in the next major version. func StartingOffset(location BacklogLocation) CreateSubscriptionOption { - return startingOffset{location} + return targetLocation{location} +} + +// AtTargetLocation specifies the target location within the message backlog +// that a new subscription should be initialized to. +// +// An additional seek request is initiated if the target location is a publish +// or event time. If the seek fails, the created subscription is not deleted. +func AtTargetLocation(target SeekTarget) CreateSubscriptionOption { + return targetLocation{target} } // CreateSubscription creates a new subscription from the given config. If the // subscription already exists an error will be returned. // // By default, a new subscription will only receive messages published after -// the subscription was created. Use StartingOffset to override. +// the subscription was created. Use AtTargetLocation to initialize the +// subscription to another location within the message backlog. func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) { - var settings createSubscriptionSettings + settings := createSubscriptionSettings{ + target: End, + } for _, opt := range opts { opt.apply(&settings) } @@ -195,16 +211,53 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti if _, err := wire.ParseTopicPath(config.Topic); err != nil { return nil, err } - req := &pb.CreateSubscriptionRequest{ + + var skipBacklog, requiresSeek, requiresUpdate bool + switch settings.target.(type) { + case PublishTime, EventTime: + requiresSeek = true + case BacklogLocation: + skipBacklog = settings.target.(BacklogLocation) == End + } + + // Request 1 - create the subscription. + createReq := &pb.CreateSubscriptionRequest{ Parent: subsPath.LocationPath().String(), Subscription: config.toProto(), SubscriptionId: subsPath.SubscriptionID, - SkipBacklog: settings.backlogLocation != Beginning, + SkipBacklog: skipBacklog, + } + if requiresSeek && createReq.Subscription.GetExportConfig().GetDesiredState() == pb.ExportConfig_ACTIVE { + // Export subscriptions must be paused while seeking. The state is later + // updated to active. + requiresUpdate = true + createReq.Subscription.ExportConfig.DesiredState = pb.ExportConfig_PAUSED } - subspb, err := ac.admin.CreateSubscription(ctx, req) + subspb, err := ac.admin.CreateSubscription(ctx, createReq) if err != nil { return nil, err } + + // Request 2 (optional) - seek the subscription. + if requiresSeek { + if _, err = ac.SeekSubscription(ctx, subsPath.String(), settings.target); err != nil { + return nil, err + } + } + + // Request 3 (optional) - make the export subscription active. + if requiresUpdate { + updateReq := &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: subsPath.String(), + ExportConfig: &pb.ExportConfig{DesiredState: pb.ExportConfig_ACTIVE}, + }, + UpdateMask: &fmpb.FieldMask{Paths: []string{"export_config.desired_state"}}, + } + if subspb, err = ac.admin.UpdateSubscription(ctx, updateReq); err != nil { + return nil, err + } + } return protoToSubscriptionConfig(subspb), nil } diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index d044a732909..15a83863295 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -29,6 +29,7 @@ import ( tspb "github.com/golang/protobuf/ptypes/timestamp" lrpb "google.golang.org/genproto/googleapis/longrunning" statuspb "google.golang.org/genproto/googleapis/rpc/status" + fmpb "google.golang.org/genproto/protobuf/field_mask" ) func newTestAdminClient(t *testing.T) *AdminClient { @@ -350,6 +351,281 @@ func TestAdminSubscriptionCRUD(t *testing.T) { } } +func TestAdminCreateSubscriptionAtTargetLocation(t *testing.T) { + const locationPath = "projects/my-proj/locations/us-central1-a" + const subscription = "my-subscription" + const topicPath = "projects/my-proj/locations/us-central1-a/topics/my-topic" + const subscriptionPath = "projects/my-proj/locations/us-central1-a/subscriptions/my-subscription" + const exportDestinationPath = "projects/my-proj/topics/destination-topic" + standardSubscription := SubscriptionConfig{ + Name: subscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + } + activeExportSubscription := SubscriptionConfig{ + Name: subscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + ExportConfig: &ExportConfig{ + DesiredState: ExportActive, + Destination: &PubSubDestinationConfig{Topic: exportDestinationPath}, + }, + } + pausedExportSubscription := SubscriptionConfig{ + Name: subscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + ExportConfig: &ExportConfig{ + DesiredState: ExportPaused, + Destination: &PubSubDestinationConfig{Topic: exportDestinationPath}, + }, + } + + timestamp := time.Unix(1234, 0) + wantSeekToPublishTimeReq := &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_PublishTime{ + PublishTime: &tspb.Timestamp{Seconds: 1234}, + }, + }, + }, + } + wantSeekToEventTimeReq := &pb.SeekSubscriptionRequest{ + Name: subscriptionPath, + Target: &pb.SeekSubscriptionRequest_TimeTarget{ + TimeTarget: &pb.TimeTarget{ + Time: &pb.TimeTarget_EventTime{ + EventTime: &tspb.Timestamp{Seconds: 1234}, + }, + }, + }, + } + wantUpdateReq := &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: subscriptionPath, + ExportConfig: &pb.ExportConfig{DesiredState: pb.ExportConfig_ACTIVE}, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{"export_config.desired_state"}, + }, + } + + createErr := status.Error(codes.InvalidArgument, "invalid") + seekErr := status.Error(codes.FailedPrecondition, "failed") + updateErr := status.Error(codes.PermissionDenied, "permission") + + ctx := context.Background() + admin := newTestAdminClient(t) + defer admin.Close() + + for _, tc := range []struct { + desc string + target SeekTarget + inputConfig SubscriptionConfig + addExpectedRequests func(*test.RPCVerifier) + wantConfig *SubscriptionConfig + wantErr error + }{ + { + desc: "Standard subscription at beginning success", + target: Beginning, + inputConfig: standardSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: standardSubscription.toProto(), + SkipBacklog: false, + }, standardSubscription.toProto(), nil) + }, + wantConfig: &standardSubscription, + }, + { + desc: "Standard subscription at end error", + target: End, + inputConfig: standardSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: standardSubscription.toProto(), + SkipBacklog: true, + }, nil, createErr) + }, + wantErr: createErr, + }, + { + desc: "Standard subscription at publish time success", + target: PublishTime(timestamp), + inputConfig: standardSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: standardSubscription.toProto(), + SkipBacklog: false, + }, standardSubscription.toProto(), nil) + verifier.Push(wantSeekToPublishTimeReq, &lrpb.Operation{}, nil) + }, + wantConfig: &standardSubscription, + }, + { + desc: "Standard subscription at event time create error", + target: EventTime(timestamp), + inputConfig: standardSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: standardSubscription.toProto(), + SkipBacklog: false, + }, nil, createErr) + }, + wantErr: createErr, + }, + { + desc: "Standard subscription at event time seek error", + target: EventTime(timestamp), + inputConfig: standardSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: standardSubscription.toProto(), + SkipBacklog: false, + }, standardSubscription.toProto(), nil) + verifier.Push(wantSeekToEventTimeReq, nil, seekErr) + }, + wantErr: seekErr, + }, + { + desc: "Active export subscription at beginning success", + target: Beginning, + inputConfig: activeExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: activeExportSubscription.toProto(), + SkipBacklog: false, + }, activeExportSubscription.toProto(), nil) + }, + wantConfig: &activeExportSubscription, + }, + { + desc: "Paused export subscription at end success", + target: End, + inputConfig: pausedExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: true, + }, pausedExportSubscription.toProto(), nil) + }, + wantConfig: &pausedExportSubscription, + }, + { + desc: "Paused export subscription at publish time success", + target: PublishTime(timestamp), + inputConfig: pausedExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: false, + }, pausedExportSubscription.toProto(), nil) + verifier.Push(wantSeekToPublishTimeReq, &lrpb.Operation{}, nil) + }, + wantConfig: &pausedExportSubscription, + }, + { + desc: "Active export subscription at event time success", + target: EventTime(timestamp), + inputConfig: activeExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + // Created in paused state. + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: false, + }, pausedExportSubscription.toProto(), nil) + verifier.Push(wantSeekToEventTimeReq, &lrpb.Operation{}, nil) + verifier.Push(wantUpdateReq, activeExportSubscription.toProto(), nil) + }, + wantConfig: &activeExportSubscription, + }, + { + desc: "Active export subscription at event time create error", + target: EventTime(timestamp), + inputConfig: activeExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + // Created in paused state. + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: false, + }, nil, createErr) + }, + wantErr: createErr, + }, + { + desc: "Active export subscription at event time seek error", + target: EventTime(timestamp), + inputConfig: activeExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + // Created in paused state. + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: false, + }, pausedExportSubscription.toProto(), nil) + verifier.Push(wantSeekToEventTimeReq, nil, seekErr) + }, + wantErr: seekErr, + }, + { + desc: "Active export subscription at event time update error", + target: EventTime(timestamp), + inputConfig: activeExportSubscription, + addExpectedRequests: func(verifier *test.RPCVerifier) { + // Created in paused state. + verifier.Push(&pb.CreateSubscriptionRequest{ + Parent: locationPath, + SubscriptionId: subscription, + Subscription: pausedExportSubscription.toProto(), + SkipBacklog: false, + }, pausedExportSubscription.toProto(), nil) + verifier.Push(wantSeekToEventTimeReq, &lrpb.Operation{}, nil) + verifier.Push(wantUpdateReq, nil, updateErr) + }, + wantErr: updateErr, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + verifiers := test.NewVerifiers(t) + tc.addExpectedRequests(verifiers.GlobalVerifier) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + gotConfig, err := admin.CreateSubscription(ctx, tc.inputConfig, AtTargetLocation(tc.target)) + if diff := testutil.Diff(gotConfig, tc.wantConfig); diff != "" { + t.Errorf("CreateSubscription() got: -, want: +\n%s", diff) + } + if !test.ErrorEqual(err, tc.wantErr) { + t.Errorf("CreateSubscription() got err: (%v), want err: (%v)", err, tc.wantErr) + } + }) + } +} + func TestAdminListSubscriptions(t *testing.T) { ctx := context.Background() diff --git a/pubsublite/config.go b/pubsublite/config.go index 3ea6793f877..6baf75ec385 100644 --- a/pubsublite/config.go +++ b/pubsublite/config.go @@ -211,6 +211,105 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { } } +// ExportDestinationConfig is the configuration for exporting to a destination. +// Implemented by *PubSubDestinationConfig. +type ExportDestinationConfig interface { + setExportConfig(ec *pb.ExportConfig) string +} + +// PubSubDestinationConfig configures messages to be exported to a Pub/Sub +// topic. Implements the ExportDestinationConfig interface. +// +// See https://cloud.google.com/pubsub/lite/docs/export-pubsub for more +// information about how export subscriptions to Pub/Sub are configured. +type PubSubDestinationConfig struct { + // The path of a Pub/Sub topic, in the format: + // "projects/PROJECT_ID/topics/TOPIC_ID". + Topic string +} + +func (pc *PubSubDestinationConfig) setExportConfig(ec *pb.ExportConfig) string { + ec.Destination = &pb.ExportConfig_PubsubConfig{ + PubsubConfig: &pb.ExportConfig_PubSubConfig{Topic: pc.Topic}, + } + return "export_config.pubsub_config" +} + +// ExportState specifies the desired state of an export subscription. +type ExportState int + +const ( + // UnspecifiedExportState represents an unset export state. + UnspecifiedExportState ExportState = iota + + // ExportActive specifies that export processing should be enabled. + ExportActive + + // ExportPaused specifies that export processing should be suspended. + ExportPaused + + // ExportPermissionDenied specifies that messages cannot be exported due to + // permission denied errors. Output only. + ExportPermissionDenied + + // ExportResourceNotFound specifies that messages cannot be exported due to + // missing resources. Output only. + ExportResourceNotFound +) + +// ExportConfig describes the properties of a Pub/Sub Lite export subscription, +// which configures the service to write messages to a destination. +type ExportConfig struct { + // The desired state of this export subscription. This should only be set to + // ExportActive or ExportPaused. + DesiredState ExportState + + // This is an output only field that reports the current export state. It is + // ignored if set in any requests. + CurrentState ExportState + + // The path of an optional Pub/Sub Lite topic to receive messages that cannot + // be exported to the destination, in the format: + // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". + // Must be within the same project and location as the subscription. + DeadLetterTopic string + + // The destination to export messages to. + Destination ExportDestinationConfig +} + +func (ec *ExportConfig) toProto() *pb.ExportConfig { + if ec == nil { + return nil + } + epb := &pb.ExportConfig{ + DeadLetterTopic: ec.DeadLetterTopic, + // Note: Assumes enum values match API proto. + DesiredState: pb.ExportConfig_State(ec.DesiredState), + CurrentState: pb.ExportConfig_State(ec.CurrentState), + } + if ec.Destination != nil { + ec.Destination.setExportConfig(epb) + } + return epb +} + +func protoToExportConfig(epb *pb.ExportConfig) *ExportConfig { + if epb == nil { + return nil + } + ec := &ExportConfig{ + DeadLetterTopic: epb.GetDeadLetterTopic(), + // Note: Assumes enum values match API proto. + DesiredState: ExportState(epb.GetDesiredState().Number()), + CurrentState: ExportState(epb.GetCurrentState().Number()), + } + if ps := epb.GetPubsubConfig(); ps != nil { + ec.Destination = &PubSubDestinationConfig{Topic: ps.Topic} + } + return ec +} + // DeliveryRequirement specifies when a subscription should send messages to // subscribers relative to persistence in storage. type DeliveryRequirement int @@ -254,12 +353,18 @@ type SubscriptionConfig struct { // Whether a message should be delivered to subscribers immediately after it // has been published or after it has been successfully written to storage. DeliveryRequirement DeliveryRequirement + + // If non-nil, configures this subscription to export messages from the + // associated topic to a destination. The ExportConfig cannot be removed after + // creation of the subscription, however its properties can be changed. + ExportConfig *ExportConfig } func (sc *SubscriptionConfig) toProto() *pb.Subscription { subspb := &pb.Subscription{ - Name: sc.Name, - Topic: sc.Topic, + Name: sc.Name, + Topic: sc.Topic, + ExportConfig: sc.ExportConfig.toProto(), } if sc.DeliveryRequirement > 0 { subspb.DeliveryConfig = &pb.Subscription_DeliveryConfig{ @@ -276,9 +381,50 @@ func protoToSubscriptionConfig(s *pb.Subscription) *SubscriptionConfig { Topic: s.GetTopic(), // Note: Assumes DeliveryRequirement enum values match API proto. DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()), + ExportConfig: protoToExportConfig(s.GetExportConfig()), } } +// ExportConfigToUpdate specifies the properties to update for an export +// subscription. +type ExportConfigToUpdate struct { + // If non-zero, updates the desired state. This should only be set to + // ExportActive or ExportPaused. + DesiredState ExportState + + // The path of an optional Pub/Sub Lite topic to receive messages that cannot + // be exported to the destination, in the format: + // "projects/PROJECT_ID/locations/LOCATION/topics/TOPIC_ID". + // Must be within the same project and location as the subscription. + DeadLetterTopic optional.String + + // If non-nil, updates the export destination configuration. + Destination ExportDestinationConfig +} + +func (ec *ExportConfigToUpdate) toUpdateRequest() (*pb.ExportConfig, []string) { + if ec == nil { + return nil, nil + } + var fields []string + updatedExport := &pb.ExportConfig{ + // Note: Assumes enum values match API proto. + DesiredState: pb.ExportConfig_State(ec.DesiredState), + } + if ec.DesiredState > 0 { + fields = append(fields, "export_config.desired_state") + } + if ec.Destination != nil { + destinationField := ec.Destination.setExportConfig(updatedExport) + fields = append(fields, destinationField) + } + if ec.DeadLetterTopic != nil { + updatedExport.DeadLetterTopic = optional.ToString(ec.DeadLetterTopic) + fields = append(fields, "export_config.dead_letter_topic") + } + return updatedExport, fields +} + // SubscriptionConfigToUpdate specifies the properties to update for a // subscription. type SubscriptionConfigToUpdate struct { @@ -289,18 +435,21 @@ type SubscriptionConfigToUpdate struct { // If non-zero, updates the message delivery requirement. DeliveryRequirement DeliveryRequirement + + // If non-nil, updates export config properties. + ExportConfig *ExportConfigToUpdate } func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest { + exportConfig, fields := sc.ExportConfig.toUpdateRequest() updatedSubs := &pb.Subscription{ Name: sc.Name, DeliveryConfig: &pb.Subscription_DeliveryConfig{ // Note: Assumes DeliveryRequirement enum values match API proto. DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), }, + ExportConfig: exportConfig, } - - var fields []string if sc.DeliveryRequirement > 0 { fields = append(fields, "delivery_config.delivery_requirement") } diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go index 34484334eb5..e3775f48c73 100644 --- a/pubsublite/config_test.go +++ b/pubsublite/config_test.go @@ -264,6 +264,58 @@ func TestSubscriptionConfigToProtoConversion(t *testing.T) { DeliveryRequirement: UnspecifiedDeliveryRequirement, }, }, + { + desc: "minimal export config", + subspb: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED, + }, + ExportConfig: &pb.ExportConfig{}, + }, + wantConfig: &SubscriptionConfig{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryRequirement: DeliverAfterStored, + ExportConfig: &ExportConfig{ + DesiredState: UnspecifiedExportState, + }, + }, + }, + { + desc: "pubsub export config", + subspb: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED, + }, + ExportConfig: &pb.ExportConfig{ + DesiredState: pb.ExportConfig_ACTIVE, + CurrentState: pb.ExportConfig_NOT_FOUND, + DeadLetterTopic: "projects/my-proj/locations/us-central1-c/topics/dead-letter-topic", + Destination: &pb.ExportConfig_PubsubConfig{ + PubsubConfig: &pb.ExportConfig_PubSubConfig{ + Topic: "projects/my-proj/topics/destination-topic", + }, + }, + }, + }, + wantConfig: &SubscriptionConfig{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryRequirement: DeliverAfterStored, + ExportConfig: &ExportConfig{ + DesiredState: ExportActive, + CurrentState: ExportResourceNotFound, + DeadLetterTopic: "projects/my-proj/locations/us-central1-c/topics/dead-letter-topic", + Destination: &PubSubDestinationConfig{ + Topic: "projects/my-proj/topics/destination-topic", + }, + }, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { gotConfig := protoToSubscriptionConfig(tc.subspb) @@ -290,6 +342,13 @@ func TestSubscriptionUpdateRequest(t *testing.T) { config: &SubscriptionConfigToUpdate{ Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", DeliveryRequirement: DeliverImmediately, + ExportConfig: &ExportConfigToUpdate{ + DesiredState: ExportPaused, + DeadLetterTopic: "projects/my-proj/topics/updated-dead-letter", + Destination: &PubSubDestinationConfig{ + Topic: "projects/my-proj/topics/updated-destination", + }, + }, }, want: &pb.UpdateSubscriptionRequest{ Subscription: &pb.Subscription{ @@ -297,14 +356,43 @@ func TestSubscriptionUpdateRequest(t *testing.T) { DeliveryConfig: &pb.Subscription_DeliveryConfig{ DeliveryRequirement: pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY, }, + ExportConfig: &pb.ExportConfig{ + DesiredState: pb.ExportConfig_PAUSED, + DeadLetterTopic: "projects/my-proj/topics/updated-dead-letter", + Destination: &pb.ExportConfig_PubsubConfig{ + PubsubConfig: &pb.ExportConfig_PubSubConfig{ + Topic: "projects/my-proj/topics/updated-destination", + }, + }, + }, }, UpdateMask: &fmpb.FieldMask{ Paths: []string{ + "export_config.desired_state", + "export_config.pubsub_config", + "export_config.dead_letter_topic", "delivery_config.delivery_requirement", }, }, }, }, + { + desc: "clear dead letter topic", + config: &SubscriptionConfigToUpdate{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + ExportConfig: &ExportConfigToUpdate{DeadLetterTopic: ""}, + }, + want: &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + DeliveryConfig: &pb.Subscription_DeliveryConfig{}, + ExportConfig: &pb.ExportConfig{DeadLetterTopic: ""}, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{"export_config.dead_letter_topic"}, + }, + }, + }, { desc: "no fields set", config: &SubscriptionConfigToUpdate{ @@ -318,6 +406,21 @@ func TestSubscriptionUpdateRequest(t *testing.T) { UpdateMask: &fmpb.FieldMask{}, }, }, + { + desc: "no export config fields set", + config: &SubscriptionConfigToUpdate{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + ExportConfig: &ExportConfigToUpdate{}, + }, + want: &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + DeliveryConfig: &pb.Subscription_DeliveryConfig{}, + ExportConfig: &pb.ExportConfig{}, + }, + UpdateMask: &fmpb.FieldMask{}, + }, + }, } { t.Run(tc.desc, func(t *testing.T) { if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) { diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 0e6afe6a1e5..0e939c44005 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -161,6 +161,70 @@ func ExampleAdminClient_CreateSubscription() { } } +// This example demonstrates how to create a new subscription initialized to a +// specified target location within the message backlog. The target location can +// be a BacklogLocation, PublishTime or EventTime. +func ExampleAdminClient_CreateSubscription_atTargetLocation() { + ctx := context.Background() + // NOTE: resources must be located within this region. + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + defer admin.Close() + + subscriptionConfig := pubsublite.SubscriptionConfig{ + Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", + Topic: "projects/my-project/locations/region-or-zone/topics/my-topic", + // Do not wait for a published message to be successfully written to storage + // before delivering it to subscribers. + DeliveryRequirement: pubsublite.DeliverImmediately, + } + // Initialize the subscription to the oldest retained messages for each + // partition. + targetLocation := pubsublite.AtTargetLocation(pubsublite.Beginning) + _, err = admin.CreateSubscription(ctx, subscriptionConfig, targetLocation) + if err != nil { + // TODO: Handle error. + } +} + +// This example demonstrates how to create a new subscription that exports +// messages to a Pub/Sub topic. +// See https://cloud.google.com/pubsub/lite/docs/export-pubsub for more +// information about how export subscriptions are configured. +func ExampleAdminClient_CreateSubscription_exportToPubSub() { + ctx := context.Background() + // NOTE: resources must be located within this region. + admin, err := pubsublite.NewAdminClient(ctx, "region") + if err != nil { + // TODO: Handle error. + } + defer admin.Close() + + subscriptionConfig := pubsublite.SubscriptionConfig{ + Name: "projects/my-project/locations/region-or-zone/subscriptions/my-subscription", + Topic: "projects/my-project/locations/region-or-zone/topics/my-topic", + // Deliver a published message to subscribers after it has been successfully + // written to storage. + DeliveryRequirement: pubsublite.DeliverAfterStored, + ExportConfig: &pubsublite.ExportConfig{ + DesiredState: pubsublite.ExportActive, + // Configure an export subscription to a Pub/Sub topic. + Destination: &pubsublite.PubSubDestinationConfig{ + Topic: "projects/my-project/topics/destination-pubsub-topic", + }, + // Optional Lite topic to receive messages that cannot be exported to the + // destination. + DeadLetterTopic: "projects/my-project/locations/region-or-zone/topics/dead-letter-topic", + }, + } + _, err = admin.CreateSubscription(ctx, subscriptionConfig) + if err != nil { + // TODO: Handle error. + } +} + func ExampleAdminClient_UpdateSubscription() { ctx := context.Background() // NOTE: resources must be located within this region. diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go index e8be64a194f..ba7dd996649 100644 --- a/pubsublite/integration_test.go +++ b/pubsublite/integration_test.go @@ -21,13 +21,14 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" + "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/internal/test" "cloud.google.com/go/pubsublite/internal/wire" - "google.golang.org/api/cloudresourcemanager/v1" "google.golang.org/api/iterator" "google.golang.org/api/option" vkit "cloud.google.com/go/pubsublite/apiv1" + cloudresourcemanager "google.golang.org/api/cloudresourcemanager/v1" ) const gibi = 1 << 30 @@ -86,6 +87,19 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio return admin } +func pubsubClient(ctx context.Context, t *testing.T, opts ...option.ClientOption) *pubsub.Client { + ts := testutil.TokenSource(ctx, vkit.DefaultAuthScopes()...) + if ts == nil { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...) + client, err := pubsub.NewClient(ctx, testutil.ProjID()) + if err != nil { + t.Fatalf("Failed to create pubsub client: %v", err) + } + return client +} + func cleanUpReservation(ctx context.Context, t *testing.T, admin *AdminClient, name string) { if err := admin.DeleteReservation(ctx, name); err != nil { t.Errorf("Failed to delete reservation %s: %v", name, err) @@ -98,6 +112,12 @@ func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name st } } +func cleanUpPubsubTopic(ctx context.Context, t *testing.T, topic *pubsub.Topic) { + if err := topic.Delete(ctx); err != nil { + t.Errorf("Failed to delete pubsub topic %s: %v", topic, err) + } +} + func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name string) { if err := admin.DeleteSubscription(ctx, name); err != nil { t.Errorf("Failed to delete subscription %s: %v", name, err) @@ -141,7 +161,9 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { locationPath := wire.LocationPath{Project: proj, Location: zone}.String() topicPath := wire.TopicPath{Project: proj, Location: zone, TopicID: resourceID}.String() + pubsubTopicPath := fmt.Sprintf("projects/%s/topics/%s", proj, resourceID) subscriptionPath := wire.SubscriptionPath{Project: proj, Location: zone, SubscriptionID: resourceID}.String() + exportSubscriptionPath := wire.SubscriptionPath{Project: proj, Location: zone, SubscriptionID: resourceID + "export"}.String() reservationPath := wire.ReservationPath{Project: proj, Region: region, ReservationID: resourceID}.String() t.Logf("Topic path: %s", topicPath) @@ -386,4 +408,62 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { } else { validateNewSeekOperation(t, subscriptionPath, seekOp) } + + // Create an export subscription to a Pub/Sub topic. + client := pubsubClient(ctx, t) + defer client.Close() + pubsubTopic, err := client.CreateTopic(ctx, resourceID) + if err != nil { + t.Fatalf("Failed to create pubsub topic: %v", err) + } + defer cleanUpPubsubTopic(ctx, t, pubsubTopic) + t.Logf("Pub/Sub topic: %s", pubsubTopic) + + newExportSubsConfig := &SubscriptionConfig{ + Name: exportSubscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + ExportConfig: &ExportConfig{ + DesiredState: ExportActive, + CurrentState: ExportActive, + Destination: &PubSubDestinationConfig{Topic: pubsubTopicPath}, + }, + } + + gotExportSubsConfig, err := admin.CreateSubscription(ctx, *newExportSubsConfig, AtTargetLocation(PublishTime(time.Now()))) + if err != nil { + t.Fatalf("Failed to create export subscription: %v", err) + } + defer cleanUpSubscription(ctx, t, admin, exportSubscriptionPath) + if diff := testutil.Diff(gotExportSubsConfig, newExportSubsConfig); diff != "" { + t.Errorf("CreateSubscription() got: -, want: +\n%s", diff) + } + + if gotExportSubsConfig, err := admin.Subscription(ctx, exportSubscriptionPath); err != nil { + t.Errorf("Failed to get export subscription: %v", err) + } else if diff := testutil.Diff(gotExportSubsConfig, newExportSubsConfig); diff != "" { + t.Errorf("Subscription() got: -, want: +\n%s", diff) + } + + exportSubsUpdate := SubscriptionConfigToUpdate{ + Name: exportSubscriptionPath, + ExportConfig: &ExportConfigToUpdate{ + DesiredState: ExportPaused, + }, + } + wantUpdatedExportSubsConfig := &SubscriptionConfig{ + Name: exportSubscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + ExportConfig: &ExportConfig{ + DesiredState: ExportPaused, + CurrentState: ExportPaused, + Destination: &PubSubDestinationConfig{Topic: pubsubTopicPath}, + }, + } + if gotExportSubsConfig, err := admin.UpdateSubscription(ctx, exportSubsUpdate); err != nil { + t.Errorf("Failed to update export subscription: %v", err) + } else if diff := testutil.Diff(gotExportSubsConfig, wantUpdatedExportSubsConfig); diff != "" { + t.Errorf("UpdateSubscription() got: -, want: +\n%s", diff) + } }