diff --git a/pubsub/awssnssqs/awssnssqs.go b/pubsub/awssnssqs/awssnssqs.go index a81fb61001..7b357455e9 100644 --- a/pubsub/awssnssqs/awssnssqs.go +++ b/pubsub/awssnssqs/awssnssqs.go @@ -357,6 +357,9 @@ type TopicOptions struct { // BodyBase64Encoding determines when message bodies are base64 encoded. // The default is NonUTF8Only. BodyBase64Encoding BodyBase64Encoding + + // BatcherOptions adds constraints to the default batching done for sends. + BatcherOptions batcher.Options } // OpenTopic is a shortcut for OpenSNSTopic, provided for backwards compatibility. @@ -367,13 +370,15 @@ func OpenTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, // OpenSNSTopic opens a topic that sends to the SNS topic with the given Amazon // Resource Name (ARN). func OpenSNSTopic(ctx context.Context, sess client.ConfigProvider, topicARN string, opts *TopicOptions) *pubsub.Topic { - return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), sendBatcherOptsSNS) + bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openSNSTopic(ctx, sns.New(sess), topicARN, opts), bo) } // OpenSNSTopicV2 opens a topic that sends to the SNS topic with the given Amazon // Resource Name (ARN), using AWS SDK V2. func OpenSNSTopicV2(ctx context.Context, client *snsv2.Client, topicARN string, opts *TopicOptions) *pubsub.Topic { - return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), sendBatcherOptsSNS) + bo := sendBatcherOptsSNS.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openSNSTopicV2(ctx, client, topicARN, opts), bo) } // openSNSTopic returns the driver for OpenSNSTopic. This function exists so the test @@ -600,13 +605,15 @@ type sqsTopic struct { // OpenSQSTopic opens a topic that sends to the SQS topic with the given SQS // queue URL. func OpenSQSTopic(ctx context.Context, sess client.ConfigProvider, qURL string, opts *TopicOptions) *pubsub.Topic { - return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), sendBatcherOptsSQS) + bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openSQSTopic(ctx, sqs.New(sess), qURL, opts), bo) } // OpenSQSTopicV2 opens a topic that sends to the SQS topic with the given SQS // queue URL, using AWS SDK V2. func OpenSQSTopicV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *TopicOptions) *pubsub.Topic { - return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), sendBatcherOptsSQS) + bo := sendBatcherOptsSQS.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openSQSTopicV2(ctx, client, qURL, opts), bo) } // openSQSTopic returns the driver for OpenSQSTopic. This function exists so the test @@ -916,20 +923,30 @@ type SubscriptionOptions struct { // Note that a non-zero WaitTime can delay delivery of messages // by up to that duration. WaitTime time.Duration + + // ReceiveBatcherOptions adds constraints to the default batching done for receives. + ReceiveBatcherOptions batcher.Options + + // AckBatcherOptions adds constraints to the default batching done for acks. + AckBatcherOptions batcher.Options } // OpenSubscription opens a subscription based on AWS SQS for the given SQS // queue URL. The queue is assumed to be subscribed to some SNS topic, though // there is no check for this. func OpenSubscription(ctx context.Context, sess client.ConfigProvider, qURL string, opts *SubscriptionOptions) *pubsub.Subscription { - return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), recvBatcherOpts, ackBatcherOpts) + rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions) + abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions) + return pubsub.NewSubscription(openSubscription(ctx, sqs.New(sess), qURL, opts), rbo, abo) } // OpenSubscriptionV2 opens a subscription based on AWS SQS for the given SQS // queue URL, using AWS SDK V2. The queue is assumed to be subscribed to some SNS topic, though // there is no check for this. func OpenSubscriptionV2(ctx context.Context, client *sqsv2.Client, qURL string, opts *SubscriptionOptions) *pubsub.Subscription { - return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), recvBatcherOpts, ackBatcherOpts) + rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions) + abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions) + return pubsub.NewSubscription(openSubscriptionV2(ctx, client, qURL, opts), rbo, abo) } // openSubscription returns a driver.Subscription. diff --git a/pubsub/azuresb/azuresb.go b/pubsub/azuresb/azuresb.go index 34cdc9189a..733ec6a0e2 100644 --- a/pubsub/azuresb/azuresb.go +++ b/pubsub/azuresb/azuresb.go @@ -87,6 +87,11 @@ var recvBatcherOpts = &batcher.Options{ MaxHandlers: 100, // max concurrency for reads } +var ackBatcherOpts = &batcher.Options{ + MaxBatchSize: 1, + MaxHandlers: 100, // max concurrency for acks +} + func init() { o := new(defaultOpener) pubsub.DefaultURLMux().RegisterTopic(Scheme, o) @@ -214,7 +219,10 @@ type topic struct { } // TopicOptions provides configuration options for an Azure SB Topic. -type TopicOptions struct{} +type TopicOptions struct { + // BatcherOptions adds constraints to the default batching done for sends. + BatcherOptions batcher.Options +} // NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string. // https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues @@ -238,7 +246,11 @@ func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOpti if err != nil { return nil, err } - return pubsub.NewTopic(t, sendBatcherOpts), nil + if opts == nil { + opts = &TopicOptions{} + } + bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(t, bo), nil } // openTopic returns the driver for OpenTopic. This function exists so the test @@ -348,6 +360,13 @@ type SubscriptionOptions struct { // When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable // will return true, and pubsub.Message.Nack will panic. ReceiveAndDelete bool + + // ReceiveBatcherOptions adds constraints to the default batching done for receives. + ReceiveBatcherOptions batcher.Options + + // AckBatcherOptions adds constraints to the default batching done for acks. + // Only used when ReceiveAndDelete is false. + AckBatcherOptions batcher.Options } // OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic. @@ -356,7 +375,12 @@ func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiv if err != nil { return nil, err } - return pubsub.NewSubscription(ds, recvBatcherOpts, nil), nil + if opts == nil { + opts = &SubscriptionOptions{} + } + rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions) + abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions) + return pubsub.NewSubscription(ds, rbo, abo), nil } // openSubscription returns a driver.Subscription. @@ -408,7 +432,7 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr defer cancel() var messages []*driver.Message - sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, recvBatcherOpts.MaxBatchSize, nil) + sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, maxMessages, nil) for _, sbmsg := range sbmsgs { metadata := map[string]string{} for key, value := range sbmsg.ApplicationProperties { diff --git a/pubsub/batcher/batcher.go b/pubsub/batcher/batcher.go index 1b31437c49..917cef822a 100644 --- a/pubsub/batcher/batcher.go +++ b/pubsub/batcher/batcher.go @@ -114,6 +114,33 @@ func newOptionsWithDefaults(opts *Options) Options { return o } +// newMergedOptions returns o merged with opts. +func (o *Options) NewMergedOptions(opts *Options) *Options { + maxH := o.MaxHandlers + if opts.MaxHandlers != 0 && (maxH == 0 || opts.MaxHandlers < maxH) { + maxH = opts.MaxHandlers + } + minB := o.MinBatchSize + if opts.MinBatchSize != 0 && (minB == 0 || opts.MinBatchSize > minB) { + minB = opts.MinBatchSize + } + maxB := o.MaxBatchSize + if opts.MaxBatchSize != 0 && (maxB == 0 || opts.MaxBatchSize < maxB) { + maxB = opts.MaxBatchSize + } + maxBB := o.MaxBatchByteSize + if opts.MaxBatchByteSize != 0 && (maxBB == 0 || opts.MaxBatchByteSize < maxBB) { + maxBB = opts.MaxBatchByteSize + } + c := &Options{ + MaxHandlers: maxH, + MinBatchSize: minB, + MaxBatchSize: maxB, + MaxBatchByteSize: maxBB, + } + return c +} + // New creates a new Batcher. // // itemType is type that will be batched. For example, if you diff --git a/pubsub/gcppubsub/gcppubsub.go b/pubsub/gcppubsub/gcppubsub.go index 3e81d1019d..eaa7838007 100644 --- a/pubsub/gcppubsub/gcppubsub.go +++ b/pubsub/gcppubsub/gcppubsub.go @@ -301,7 +301,10 @@ func SubscriberClient(ctx context.Context, conn *grpc.ClientConn) (*raw.Subscrib } // TopicOptions will contain configuration for topics. -type TopicOptions struct{} +type TopicOptions struct { + // BatcherOptions adds constraints to the default batching done for sends. + BatcherOptions batcher.Options +} // OpenTopic returns a *pubsub.Topic backed by an existing GCP PubSub topic // in the given projectID. topicName is the last part of the full topic @@ -309,7 +312,11 @@ type TopicOptions struct{} // See the package documentation for an example. func OpenTopic(client *raw.PublisherClient, projectID gcp.ProjectID, topicName string, opts *TopicOptions) *pubsub.Topic { topicPath := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName) - return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts) + if opts == nil { + opts = &TopicOptions{} + } + bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openTopic(client, topicPath), bo) } var topicPathRE = regexp.MustCompile("^projects/.+/topics/.+$") @@ -321,7 +328,11 @@ func OpenTopicByPath(client *raw.PublisherClient, topicPath string, opts *TopicO if !topicPathRE.MatchString(topicPath) { return nil, fmt.Errorf("invalid topicPath %q; must match %v", topicPath, topicPathRE) } - return pubsub.NewTopic(openTopic(client, topicPath), sendBatcherOpts), nil + if opts == nil { + opts = &TopicOptions{} + } + bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions) + return pubsub.NewTopic(openTopic(client, topicPath), bo), nil } // openTopic returns the driver for OpenTopic. This function exists so the test @@ -424,6 +435,12 @@ type subscription struct { type SubscriptionOptions struct { // MaxBatchSize caps the maximum batch size used when retrieving messages. It defaults to 1000. MaxBatchSize int + + // ReceiveBatcherOptions adds constraints to the default batching done for receives. + ReceiveBatcherOptions batcher.Options + + // AckBatcherOptions adds constraints to the default batching done for acks. + AckBatcherOptions batcher.Options } // OpenSubscription returns a *pubsub.Subscription backed by an existing GCP @@ -435,7 +452,9 @@ func OpenSubscription(client *raw.SubscriberClient, projectID gcp.ProjectID, sub dsub := openSubscription(client, path, opts) recvOpts := *defaultRecvBatcherOpts recvOpts.MaxBatchSize = dsub.options.MaxBatchSize - return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts) + rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions) + abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions) + return pubsub.NewSubscription(dsub, rbo, abo) } var subscriptionPathRE = regexp.MustCompile("^projects/.+/subscriptions/.+$") @@ -452,7 +471,9 @@ func OpenSubscriptionByPath(client *raw.SubscriberClient, subscriptionPath strin dsub := openSubscription(client, subscriptionPath, opts) recvOpts := *defaultRecvBatcherOpts recvOpts.MaxBatchSize = dsub.options.MaxBatchSize - return pubsub.NewSubscription(dsub, &recvOpts, ackBatcherOpts), nil + rbo := recvOpts.NewMergedOptions(&dsub.options.ReceiveBatcherOptions) + abo := ackBatcherOpts.NewMergedOptions(&dsub.options.AckBatcherOptions) + return pubsub.NewSubscription(dsub, rbo, abo), nil } // openSubscription returns a driver.Subscription. @@ -460,11 +481,9 @@ func openSubscription(client *raw.SubscriberClient, subscriptionPath string, opt if opts == nil { opts = &SubscriptionOptions{} } - if opts.MaxBatchSize == 0 { opts.MaxBatchSize = defaultRecvBatcherOpts.MaxBatchSize } - return &subscription{client, subscriptionPath, opts} }