Skip to content

Commit

Permalink
feat(pubsublite): create/update export subscriptions (#6885)
Browse files Browse the repository at this point in the history
Added support for creating and updating export subscriptions, which write messages to a configured destination, for example a Pub/Sub topic. More destination types may be supported in the future.

Added support for creating subscriptions initialized to a target location within the message backlog. A seek is performed for publish and event timestamps.
  • Loading branch information
tmdiep committed Dec 10, 2022
1 parent 176f533 commit 5fa8555
Show file tree
Hide file tree
Showing 6 changed files with 741 additions and 16 deletions.
75 changes: 64 additions & 11 deletions pubsublite/admin.go
Expand Up @@ -22,6 +22,7 @@ import (

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "cloud.google.com/go/pubsublite/apiv1/pubsublitepb"
fmpb "google.golang.org/genproto/protobuf/field_mask"
)

var (
Expand Down Expand Up @@ -155,35 +156,50 @@ func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator
}

type createSubscriptionSettings struct {
backlogLocation BacklogLocation
target SeekTarget
}

// CreateSubscriptionOption is an option for AdminClient.CreateSubscription.
type CreateSubscriptionOption interface {
apply(*createSubscriptionSettings)
}

type startingOffset struct {
backlogLocation BacklogLocation
type targetLocation struct {
target SeekTarget
}

func (so startingOffset) apply(settings *createSubscriptionSettings) {
settings.backlogLocation = so.backlogLocation
func (tl targetLocation) apply(settings *createSubscriptionSettings) {
settings.target = tl.target
}

// StartingOffset specifies the offset at which a newly created subscription
// will start receiving messages.
//
// Deprecated. This is equivalent to calling AtTargetLocation with a
// BacklogLocation and will be removed in the next major version.
func StartingOffset(location BacklogLocation) CreateSubscriptionOption {
return startingOffset{location}
return targetLocation{location}
}

// AtTargetLocation specifies the target location within the message backlog
// that a new subscription should be initialized to.
//
// An additional seek request is initiated if the target location is a publish
// or event time. If the seek fails, the created subscription is not deleted.
func AtTargetLocation(target SeekTarget) CreateSubscriptionOption {
return targetLocation{target}
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
//
// By default, a new subscription will only receive messages published after
// the subscription was created. Use StartingOffset to override.
// the subscription was created. Use AtTargetLocation to initialize the
// subscription to another location within the message backlog.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig, opts ...CreateSubscriptionOption) (*SubscriptionConfig, error) {
var settings createSubscriptionSettings
settings := createSubscriptionSettings{
target: End,
}
for _, opt := range opts {
opt.apply(&settings)
}
Expand All @@ -195,16 +211,53 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
if _, err := wire.ParseTopicPath(config.Topic); err != nil {
return nil, err
}
req := &pb.CreateSubscriptionRequest{

var skipBacklog, requiresSeek, requiresUpdate bool
switch settings.target.(type) {
case PublishTime, EventTime:
requiresSeek = true
case BacklogLocation:
skipBacklog = settings.target.(BacklogLocation) == End
}

// Request 1 - create the subscription.
createReq := &pb.CreateSubscriptionRequest{
Parent: subsPath.LocationPath().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: settings.backlogLocation != Beginning,
SkipBacklog: skipBacklog,
}
if requiresSeek && createReq.Subscription.GetExportConfig().GetDesiredState() == pb.ExportConfig_ACTIVE {
// Export subscriptions must be paused while seeking. The state is later
// updated to active.
requiresUpdate = true
createReq.Subscription.ExportConfig.DesiredState = pb.ExportConfig_PAUSED
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
subspb, err := ac.admin.CreateSubscription(ctx, createReq)
if err != nil {
return nil, err
}

// Request 2 (optional) - seek the subscription.
if requiresSeek {
if _, err = ac.SeekSubscription(ctx, subsPath.String(), settings.target); err != nil {
return nil, err
}
}

// Request 3 (optional) - make the export subscription active.
if requiresUpdate {
updateReq := &pb.UpdateSubscriptionRequest{
Subscription: &pb.Subscription{
Name: subsPath.String(),
ExportConfig: &pb.ExportConfig{DesiredState: pb.ExportConfig_ACTIVE},
},
UpdateMask: &fmpb.FieldMask{Paths: []string{"export_config.desired_state"}},
}
if subspb, err = ac.admin.UpdateSubscription(ctx, updateReq); err != nil {
return nil, err
}
}
return protoToSubscriptionConfig(subspb), nil
}

Expand Down

0 comments on commit 5fa8555

Please sign in to comment.