Skip to content

Commit

Permalink
pubsub/kafka: Allow overriding batching options for Topic (#3163)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent committed Sep 13, 2022
1 parent 2ae6e17 commit 2c69298
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pubsub/kafkapubsub/kafka.go
Expand Up @@ -73,10 +73,11 @@ import (

var sendBatcherOpts = &batcher.Options{
MaxBatchSize: 100,
MaxHandlers: 2,
MaxHandlers: 100, // max concurrency for sends
}

var recvBatcherOpts = &batcher.Options{
// Concurrency doesn't make sense here.
MaxBatchSize: 1,
MaxHandlers: 1,
}
Expand Down Expand Up @@ -216,6 +217,9 @@ type TopicOptions struct {
// the value for that key will be used as the message key when sending to
// Kafka, instead of being added to the message headers.
KeyName string

// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic creates a pubsub.Topic that sends to a Kafka topic.
Expand All @@ -230,7 +234,8 @@ func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *
if err != nil {
return nil, err
}
return pubsub.NewTopic(dt, sendBatcherOpts), nil
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(dt, bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down

0 comments on commit 2c69298

Please sign in to comment.