Skip to content

Commit

Permalink
Session timeouts, hearbeat, maxProcessing time
Browse files Browse the repository at this point in the history
New configurations added as based on the type of processing and the size of the batch these values need configurations for every use case. We needed this to process huge batch of loads in Redshift cluster. When the batch size was huge the timeouts where happening leading to commits not going through. Details on why we are doing this is in this issue #150 (comment)
  • Loading branch information
alok87 committed Mar 4, 2021
1 parent de0d32d commit c980597
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
4 changes: 2 additions & 2 deletions controllers/batcher_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func NewBatcher(
Sarama: kafka.SaramaConfig{
Assignor: "range",
Oldest: true,
Log: false,
AutoCommit: true,
Log: true,
AutoCommit: false,
},
})
}
Expand Down
17 changes: 13 additions & 4 deletions controllers/loader_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func NewLoader(
var groupConfigs []kafka.ConsumerGroupConfig
for groupID, group := range consumerGroups {
totalTopics += len(group.topics)

// defaults for the loader
var sessionTimeoutSeconds int = 300
var hearbeatIntervalSeconds int = 120
var maxProcessingSeconds float32 = 1200

groupConfigs = append(groupConfigs, kafka.ConsumerGroupConfig{
GroupID: consumerGroupID(rsk.Name, rsk.Namespace, groupID, "-loader"),
TopicRegexes: expandTopicsToRegex(
Expand All @@ -113,10 +119,13 @@ func NewLoader(
TLSConfig: *tlsConfig,
},
Sarama: kafka.SaramaConfig{
Assignor: "range",
Oldest: true,
Log: false,
AutoCommit: false,
Assignor: "range",
Oldest: true,
Log: true,
AutoCommit: false,
SessionTimeoutSeconds: &sessionTimeoutSeconds,
HearbeatIntervalSeconds: &hearbeatIntervalSeconds,
MaxProcessingSeconds: &maxProcessingSeconds,
},
})
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/kafka/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ type KafkaConfig struct {
TLSConfig TLSConfig `yaml:"tlsConfig"`
}

type SaramaConfig struct {
Assignor string `yaml:"assignor"` // default is there
Oldest bool `yaml:"oldest"`
Log bool `yaml:"log"`
AutoCommit bool `yaml:"autoCommit"`
}

func NewConsumerGroup(
config ConsumerGroupConfig,
consumerGroupHandler sarama.ConsumerGroupHandler,
Expand All @@ -62,9 +55,6 @@ func NewConsumerGroup(
if config.LoaderTopicPrefix == "" {
config.LoaderTopicPrefix = "loader-"
}
if config.Sarama.Assignor == "" {
config.Sarama.Assignor = "range"
}

switch config.Kafka.KafkaClient {
case "sarama":
Expand Down
35 changes: 35 additions & 0 deletions pkg/kafka/consumer_group_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ import (
"log"
"os"
"strings"
"time"
)

type SaramaConfig struct {
Assignor string `yaml:"assignor"` // default is there
Oldest bool `yaml:"oldest"`
Log bool `yaml:"log"`
AutoCommit bool `yaml:"autoCommit"`
SessionTimeoutSeconds *int `yaml:"sessionTimeoutSeconds,omitempty"` // default 20s
HearbeatIntervalSeconds *int `yaml:"heartBeatIntervalSeconds,omitempty"` // default 6s
MaxProcessingSeconds *float32 `yaml:"MaxProcessingSeconds,omitempty"` // default 0.5s (it is for the complete batch)
}

type saramaConsumerGroup struct {
// client is required to get Kafka cluster related info like Topics
client sarama.Client
Expand All @@ -32,6 +43,10 @@ func NewSaramaConsumerGroup(
ConsumerGroupInterface,
error,
) {
// set defaults
if config.Sarama.Assignor == "" {
config.Sarama.Assignor = "range"
}
if config.Sarama.Log {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
Expand All @@ -56,6 +71,24 @@ func NewSaramaConsumerGroup(
"Unknown group-partition assignor: %s", config.Sarama.Assignor)
}

if config.Sarama.SessionTimeoutSeconds != nil {
c.Consumer.Group.Session.Timeout = time.Duration(*config.Sarama.SessionTimeoutSeconds) * time.Second
} else {
c.Consumer.Group.Session.Timeout = 20 * time.Second // overwrites sarams default of 10s
}

if config.Sarama.HearbeatIntervalSeconds != nil {
c.Consumer.Group.Heartbeat.Interval = time.Duration(*config.Sarama.HearbeatIntervalSeconds) * time.Second
} else {
c.Consumer.Group.Heartbeat.Interval = 6 * time.Second // overwrites sarams default of 3s
}

if config.Sarama.MaxProcessingSeconds != nil {
c.Consumer.MaxProcessingTime = time.Duration(*config.Sarama.MaxProcessingSeconds) * time.Second
} else {
c.Consumer.MaxProcessingTime = 500 * time.Millisecond // overwrites saramas default of 100ms
}

if config.Kafka.TLSConfig.Enable {
c.Net.TLS.Enable = true
tlsConfig, err := NewTLSConfig(config.Kafka.TLSConfig)
Expand All @@ -78,6 +111,8 @@ func NewSaramaConsumerGroup(
// c.Consumer.Fetch.Max = 10
brokers := strings.Split(config.Kafka.Brokers, ",")

klog.V(2).Infof("cg:%s config: %+v", config.GroupID, c)

consumerGroup, err := sarama.NewConsumerGroup(
brokers, config.GroupID, c)
if err != nil {
Expand Down

0 comments on commit c980597

Please sign in to comment.