Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub/kafka: Allow overriding batching options for Topic #3163

Merged
merged 1 commit into from Sep 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions pubsub/kafkapubsub/kafka.go
Expand Up @@ -73,10 +73,11 @@ import (

var sendBatcherOpts = &batcher.Options{
MaxBatchSize: 100,
MaxHandlers: 2,
MaxHandlers: 100, // max concurrency for sends
}

var recvBatcherOpts = &batcher.Options{
// Concurrency doesn't make sense here.
MaxBatchSize: 1,
MaxHandlers: 1,
}
Expand Down Expand Up @@ -216,6 +217,9 @@ type TopicOptions struct {
// the value for that key will be used as the message key when sending to
// Kafka, instead of being added to the message headers.
KeyName string

// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}

// OpenTopic creates a pubsub.Topic that sends to a Kafka topic.
Expand All @@ -230,7 +234,8 @@ func OpenTopic(brokers []string, config *sarama.Config, topicName string, opts *
if err != nil {
return nil, err
}
return pubsub.NewTopic(dt, sendBatcherOpts), nil
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(dt, bo), nil
}

// openTopic returns the driver for OpenTopic. This function exists so the test
Expand Down