Skip to content

Commit

Permalink
pubsub/gcppubsub: make it possible to configure max_send_batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
bianpengyuan committed Jun 13, 2023
1 parent 0c8428b commit 1db413f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
25 changes: 21 additions & 4 deletions pubsub/gcppubsub/gcppubsub.go
Expand Up @@ -187,6 +187,7 @@ const Scheme = "gcppubsub"
// The following query parameters are supported:
//
// - max_recv_batch_size: sets SubscriptionOptions.MaxBatchSize.
// - max_send_batch_size: sets TopicOptions.BatcherOptions.MaxBatchSize.
// - nacklazy: sets SubscriberOptions.NackLazy. The value must be parseable by `strconv.ParseBool`.
//
// Currently their use is limited to subscribers.
Expand All @@ -203,20 +204,36 @@ type URLOpener struct {

// OpenTopicURL opens a pubsub.Topic based on u.
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
for param := range u.Query() {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
opts := o.TopicOptions

for param, value := range u.Query() {
switch param {
case "max_send_batch_size":
maxBatchSize, err := queryParameterInt(value)
if err != nil {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q: %v", u, param, err)
}

if maxBatchSize <= 0 || maxBatchSize > 1000 {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q: must be between 1 and 1000", u, param)
}

opts.BatcherOptions.MaxBatchSize = maxBatchSize
default:
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
}
}
pc, err := PublisherClient(ctx, o.Conn)
if err != nil {
return nil, err
}
topicPath := path.Join(u.Host, u.Path)
if topicPathRE.MatchString(topicPath) {
return OpenTopicByPath(pc, topicPath, &o.TopicOptions)
return OpenTopicByPath(pc, topicPath, &opts)
}
// Shortened form?
topicName := strings.TrimPrefix(u.Path, "/")
return OpenTopic(pc, gcp.ProjectID(u.Host), topicName, &o.TopicOptions), nil
return OpenTopic(pc, gcp.ProjectID(u.Host), topicName, &opts), nil
}

// OpenSubscriptionURL opens a pubsub.Subscription based on u.
Expand Down
6 changes: 6 additions & 0 deletions pubsub/gcppubsub/gcppubsub_test.go
Expand Up @@ -384,6 +384,12 @@ func TestOpenTopicFromURL(t *testing.T) {
{"gcppubsub://projects/myproject/topic/mytopic", false},
// Invalid parameter.
{"gcppubsub://myproject/mytopic?param=value", true},
// Valid max_send_batch_size
{"gcppubsub://projects/mytopic?max_send_batch_size=1", false},
// Invalid max_send_batch_size
{"gcppubsub://projects/mytopic?max_send_batch_size=0", true},
// Invalid max_send_batch_size
{"gcppubsub://projects/mytopic?max_send_batch_size=1001", true},
}

ctx := context.Background()
Expand Down

0 comments on commit 1db413f

Please sign in to comment.