Skip to content

Commit

Permalink
pubsub: Add support for overriding batch to AWS, GCP, Azure drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent committed Aug 10, 2022
1 parent 0c45fa6 commit 12cb59e
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 17 deletions.
29 changes: 23 additions & 6 deletions pubsub/awssnssqs/awssnssqs.go
Expand Up @@ -357,6 +357,9 @@ type TopicOptions struct {
// BodyBase64Encoding determines when message bodies are base64 encoded.
// The default is NonUTF8Only.
BodyBase64Encoding BodyBase64Encoding

// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic is a shortcut for OpenSNSTopic, provided for backwards compatibility.
Expand All @@ -367,13 +370,15 @@ func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string,
// OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon
// Resource Name (ARN).
func OpenSNSTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), sendBatcherOptsSNS)
bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), bo)
}

// OpenSNSTopicV2 opens a topic that sends to the SNS topic with the given Amazon
// Resource Name (ARN), using AWS SDK V2.
func OpenSNSTopicV2(ctx context.Context, client *snsv2.Client, topicARN string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), sendBatcherOptsSNS)
bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), bo)
}

// openSNSTopic returns the driver for OpenSNSTopic. This function exists so the test
Expand Down Expand Up @@ -600,13 +605,15 @@ type sqsTopic struct {
// OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS
// queue URL.
func OpenSQSTopic(ctx context.Context, sess client.ConfigProvider, qURL string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), sendBatcherOptsSQS)
bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), bo)
}

// OpenSQSTopicV2 opens a topic that sends to the SQS topic with the given SQS
// queue URL, using AWS SDK V2.
func OpenSQSTopicV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), sendBatcherOptsSQS)
bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), bo)
}

// openSQSTopic returns the driver for OpenSQSTopic. This function exists so the test
Expand Down Expand Up @@ -916,20 +923,30 @@ type SubscriptionOptions struct {
// Note that a non-zero WaitTime can delay delivery of messages
// by up to that duration.
WaitTime time.Duration

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
AckBatcherOptions batcher.Options
}

// OpenSubscription opens a subscription based on AWS SQS for the given SQS
// queue URL. The queue is assumed to be subscribed to some SNS topic, though
// there is no check for this.
func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), recvBatcherOpts, ackBatcherOpts)
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), rbo, abo)
}

// OpenSubscriptionV2 opens a subscription based on AWS SQS for the given SQS
// queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though
// there is no check for this.
func OpenSubscriptionV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), recvBatcherOpts, ackBatcherOpts)
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), rbo, abo)
}

// openSubscription returns a driver.Subscription.
Expand Down
32 changes: 28 additions & 4 deletions pubsub/azuresb/azuresb.go
Expand Up @@ -87,6 +87,11 @@ var recvBatcherOpts = &batcher.Options{
MaxHandlers: 100, // max concurrency for reads
}

var ackBatcherOpts = &batcher.Options{
MaxBatchSize: 1,
MaxHandlers: 100, // max concurrency for acks
}

func init() {
o := new(defaultOpener)
pubsub.DefaultURLMux().RegisterTopic(Scheme, o)
Expand Down Expand Up @@ -214,7 +219,10 @@ type topic struct {
}

// TopicOptions provides configuration options for an Azure SB Topic.
type TopicOptions struct{}
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
Expand All @@ -238,7 +246,11 @@ func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOpti
if err != nil {
return nil, err
}
return pubsub.NewTopic(t, sendBatcherOpts), nil
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(t, bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down Expand Up @@ -348,6 +360,13 @@ type SubscriptionOptions struct {
// When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable
// will return true, and pubsub.Message.Nack will panic.
ReceiveAndDelete bool

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
// Only used when ReceiveAndDelete is false.
AckBatcherOptions batcher.Options
}

// OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.
Expand All @@ -356,7 +375,12 @@ func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiv
if err != nil {
return nil, err
}
return pubsub.NewSubscription(ds, recvBatcherOpts, nil), nil
if opts == nil {
opts = &SubscriptionOptions{}
}
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(ds, rbo, abo), nil
}

// openSubscription returns a driver.Subscription.
Expand Down Expand Up @@ -408,7 +432,7 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr
defer cancel()

var messages []*driver.Message
sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, recvBatcherOpts.MaxBatchSize, nil)
sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, maxMessages, nil)
for _, sbmsg := range sbmsgs {
metadata := map[string]string{}
for key, value := range sbmsg.ApplicationProperties {
Expand Down
27 changes: 27 additions & 0 deletions pubsub/batcher/batcher.go
Expand Up @@ -114,6 +114,33 @@ func newOptionsWithDefaults(opts *Options) Options {
return o
}

// newMergedOptions returns o merged with opts.
func (o *Options) NewMergedOptions(opts *Options) *Options {
maxH := o.MaxHandlers
if opts.MaxHandlers != 0 && (maxH == 0 || opts.MaxHandlers < maxH) {
maxH = opts.MaxHandlers
}
minB := o.MinBatchSize
if opts.MinBatchSize != 0 && (minB == 0 || opts.MinBatchSize > minB) {
minB = opts.MinBatchSize
}
maxB := o.MaxBatchSize
if opts.MaxBatchSize != 0 && (maxB == 0 || opts.MaxBatchSize < maxB) {
maxB = opts.MaxBatchSize
}
maxBB := o.MaxBatchByteSize
if opts.MaxBatchByteSize != 0 && (maxBB == 0 || opts.MaxBatchByteSize < maxBB) {
maxBB = opts.MaxBatchByteSize
}
c := &Options{
MaxHandlers: maxH,
MinBatchSize: minB,
MaxBatchSize: maxB,
MaxBatchByteSize: maxBB,
}
return c
}

// New creates a new Batcher.
//
// itemType is type that will be batched. For example, if you
Expand Down
33 changes: 26 additions & 7 deletions pubsub/gcppubsub/gcppubsub.go
Expand Up @@ -301,15 +301,22 @@ func SubscriberClient(ctx context.Context, conn *grpc.ClientConn) (*raw.Subscrib
}

// TopicOptions will contain configuration for topics.
type TopicOptions struct{}
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic returns a *pubsub.Topic backed by an existing GCP PubSub topic
// in the given projectID. topicName is the last part of the full topic
// path, e.g., "foo" from "projects/<projectID>/topic/foo".
// See the package documentation for an example.
func OpenTopic(client *raw.PublisherClient, projectID gcp.ProjectID, topicName string, opts *TopicOptions) *pubsub.Topic {
topicPath := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts)
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openTopic(client, topicPath), bo)
}

var topicPathRE = regexp.MustCompile("^projects/.+/topics/.+$")
Expand All @@ -321,7 +328,11 @@ func OpenTopicByPath(client *raw.PublisherClient, topicPath string, opts *TopicO
if !topicPathRE.MatchString(topicPath) {
return nil, fmt.Errorf("invalid topicPath %q; must match %v", topicPath, topicPathRE)
}
return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts), nil
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openTopic(client, topicPath), bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down Expand Up @@ -424,6 +435,12 @@ type subscription struct {
type SubscriptionOptions struct {
// MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000.
MaxBatchSize int

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
AckBatcherOptions batcher.Options
}

// OpenSubscription returns a *pubsub.Subscription backed by an existing GCP
Expand All @@ -435,7 +452,9 @@ func OpenSubscription(client *raw.SubscriberClient, projectID gcp.ProjectID, sub
dsub := openSubscription(client, path, opts)
recvOpts := *defaultRecvBatcherOpts
recvOpts.MaxBatchSize = dsub.options.MaxBatchSize
return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts)
rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions)
return pubsub.NewSubscription(dsub, rbo, abo)
}

var subscriptionPathRE = regexp.MustCompile("^projects/.+/subscriptions/.+$")
Expand All @@ -452,19 +471,19 @@ func OpenSubscriptionByPath(client *raw.SubscriberClient, subscriptionPath strin
dsub := openSubscription(client, subscriptionPath, opts)
recvOpts := *defaultRecvBatcherOpts
recvOpts.MaxBatchSize = dsub.options.MaxBatchSize
return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts), nil
rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions)
return pubsub.NewSubscription(dsub, rbo, abo), nil
}

// openSubscription returns a driver.Subscription.
func openSubscription(client *raw.SubscriberClient, subscriptionPath string, opts *SubscriptionOptions) *subscription {
if opts == nil {
opts = &SubscriptionOptions{}
}

if opts.MaxBatchSize == 0 {
opts.MaxBatchSize = defaultRecvBatcherOpts.MaxBatchSize
}

return &subscription{client, subscriptionPath, opts}
}

Expand Down

0 comments on commit 12cb59e

Please sign in to comment.