Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: Add support for overriding batch to AWS, GCP, Azure drivers #3158

Merged
merged 1 commit into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 23 additions & 6 deletions pubsub/awssnssqs/awssnssqs.go
Expand Up @@ -357,6 +357,9 @@ type TopicOptions struct {
// BodyBase64Encoding determines when message bodies are base64 encoded.
// The default is NonUTF8Only.
BodyBase64Encoding BodyBase64Encoding

// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic is a shortcut for OpenSNSTopic, provided for backwards compatibility.
Expand All @@ -367,13 +370,15 @@ func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string,
// OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon
// Resource Name (ARN).
func OpenSNSTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), sendBatcherOptsSNS)
bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), bo)
}

// OpenSNSTopicV2 opens a topic that sends to the SNS topic with the given Amazon
// Resource Name (ARN), using AWS SDK V2.
func OpenSNSTopicV2(ctx context.Context, client *snsv2.Client, topicARN string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), sendBatcherOptsSNS)
bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), bo)
}

// openSNSTopic returns the driver for OpenSNSTopic. This function exists so the test
Expand Down Expand Up @@ -600,13 +605,15 @@ type sqsTopic struct {
// OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS
// queue URL.
func OpenSQSTopic(ctx context.Context, sess client.ConfigProvider, qURL string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), sendBatcherOptsSQS)
bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), bo)
}

// OpenSQSTopicV2 opens a topic that sends to the SQS topic with the given SQS
// queue URL, using AWS SDK V2.
func OpenSQSTopicV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *TopicOptions) *pubsub.Topic {
return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), sendBatcherOptsSQS)
bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), bo)
}

// openSQSTopic returns the driver for OpenSQSTopic. This function exists so the test
Expand Down Expand Up @@ -916,20 +923,30 @@ type SubscriptionOptions struct {
// Note that a non-zero WaitTime can delay delivery of messages
// by up to that duration.
WaitTime time.Duration

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
AckBatcherOptions batcher.Options
}

// OpenSubscription opens a subscription based on AWS SQS for the given SQS
// queue URL. The queue is assumed to be subscribed to some SNS topic, though
// there is no check for this.
func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), recvBatcherOpts, ackBatcherOpts)
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), rbo, abo)
}

// OpenSubscriptionV2 opens a subscription based on AWS SQS for the given SQS
// queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though
// there is no check for this.
func OpenSubscriptionV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), recvBatcherOpts, ackBatcherOpts)
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), rbo, abo)
}

// openSubscription returns a driver.Subscription.
Expand Down
32 changes: 28 additions & 4 deletions pubsub/azuresb/azuresb.go
Expand Up @@ -87,6 +87,11 @@ var recvBatcherOpts = &batcher.Options{
MaxHandlers: 100, // max concurrency for reads
}

var ackBatcherOpts = &batcher.Options{
MaxBatchSize: 1,
MaxHandlers: 100, // max concurrency for acks
}

func init() {
o := new(defaultOpener)
pubsub.DefaultURLMux().RegisterTopic(Scheme, o)
Expand Down Expand Up @@ -214,7 +219,10 @@ type topic struct {
}

// TopicOptions provides configuration options for an Azure SB Topic.
type TopicOptions struct{}
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
Expand All @@ -238,7 +246,11 @@ func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOpti
if err != nil {
return nil, err
}
return pubsub.NewTopic(t, sendBatcherOpts), nil
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(t, bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down Expand Up @@ -348,6 +360,13 @@ type SubscriptionOptions struct {
// When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable
// will return true, and pubsub.Message.Nack will panic.
ReceiveAndDelete bool

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
// Only used when ReceiveAndDelete is false.
AckBatcherOptions batcher.Options
}

// OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.
Expand All @@ -356,7 +375,12 @@ func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiv
if err != nil {
return nil, err
}
return pubsub.NewSubscription(ds, recvBatcherOpts, nil), nil
if opts == nil {
opts = &SubscriptionOptions{}
}
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(ds, rbo, abo), nil
}

// openSubscription returns a driver.Subscription.
Expand Down Expand Up @@ -408,7 +432,7 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr
defer cancel()

var messages []*driver.Message
sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, recvBatcherOpts.MaxBatchSize, nil)
sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, maxMessages, nil)
for _, sbmsg := range sbmsgs {
metadata := map[string]string{}
for key, value := range sbmsg.ApplicationProperties {
Expand Down
27 changes: 27 additions & 0 deletions pubsub/batcher/batcher.go
Expand Up @@ -114,6 +114,33 @@ func newOptionsWithDefaults(opts *Options) Options {
return o
}

// newMergedOptions returns o merged with opts.
func (o *Options) NewMergedOptions(opts *Options) *Options {
maxH := o.MaxHandlers
if opts.MaxHandlers != 0 && (maxH == 0 || opts.MaxHandlers < maxH) {
maxH = opts.MaxHandlers
}
minB := o.MinBatchSize
if opts.MinBatchSize != 0 && (minB == 0 || opts.MinBatchSize > minB) {
minB = opts.MinBatchSize
}
maxB := o.MaxBatchSize
if opts.MaxBatchSize != 0 && (maxB == 0 || opts.MaxBatchSize < maxB) {
maxB = opts.MaxBatchSize
}
maxBB := o.MaxBatchByteSize
if opts.MaxBatchByteSize != 0 && (maxBB == 0 || opts.MaxBatchByteSize < maxBB) {
maxBB = opts.MaxBatchByteSize
}
c := &Options{
MaxHandlers: maxH,
MinBatchSize: minB,
MaxBatchSize: maxB,
MaxBatchByteSize: maxBB,
}
return c
}

// New creates a new Batcher.
//
// itemType is type that will be batched. For example, if you
Expand Down
33 changes: 26 additions & 7 deletions pubsub/gcppubsub/gcppubsub.go
Expand Up @@ -301,15 +301,22 @@ func SubscriberClient(ctx context.Context, conn *grpc.ClientConn) (*raw.Subscrib
}

// TopicOptions will contain configuration for topics.
type TopicOptions struct{}
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic returns a *pubsub.Topic backed by an existing GCP PubSub topic
// in the given projectID. topicName is the last part of the full topic
// path, e.g., "foo" from "projects/<projectID>/topic/foo".
// See the package documentation for an example.
func OpenTopic(client *raw.PublisherClient, projectID gcp.ProjectID, topicName string, opts *TopicOptions) *pubsub.Topic {
topicPath := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts)
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openTopic(client, topicPath), bo)
}

var topicPathRE = regexp.MustCompile("^projects/.+/topics/.+$")
Expand All @@ -321,7 +328,11 @@ func OpenTopicByPath(client *raw.PublisherClient, topicPath string, opts *TopicO
if !topicPathRE.MatchString(topicPath) {
return nil, fmt.Errorf("invalid topicPath %q; must match %v", topicPath, topicPathRE)
}
return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts), nil
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(openTopic(client, topicPath), bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down Expand Up @@ -424,6 +435,12 @@ type subscription struct {
type SubscriptionOptions struct {
// MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000.
MaxBatchSize int

// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options

// AckBatcherOptions adds constraints to the default batching done for acks.
AckBatcherOptions batcher.Options
}

// OpenSubscription returns a *pubsub.Subscription backed by an existing GCP
Expand All @@ -435,7 +452,9 @@ func OpenSubscription(client *raw.SubscriberClient, projectID gcp.ProjectID, sub
dsub := openSubscription(client, path, opts)
recvOpts := *defaultRecvBatcherOpts
recvOpts.MaxBatchSize = dsub.options.MaxBatchSize
return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts)
rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions)
return pubsub.NewSubscription(dsub, rbo, abo)
}

var subscriptionPathRE = regexp.MustCompile("^projects/.+/subscriptions/.+$")
Expand All @@ -452,19 +471,19 @@ func OpenSubscriptionByPath(client *raw.SubscriberClient, subscriptionPath strin
dsub := openSubscription(client, subscriptionPath, opts)
recvOpts := *defaultRecvBatcherOpts
recvOpts.MaxBatchSize = dsub.options.MaxBatchSize
return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts), nil
rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions)
return pubsub.NewSubscription(dsub, rbo, abo), nil
}

// openSubscription returns a driver.Subscription.
func openSubscription(client *raw.SubscriberClient, subscriptionPath string, opts *SubscriptionOptions) *subscription {
if opts == nil {
opts = &SubscriptionOptions{}
}

if opts.MaxBatchSize == 0 {
opts.MaxBatchSize = defaultRecvBatcherOpts.MaxBatchSize
}

return &subscription{client, subscriptionPath, opts}
}

Expand Down