Skip to content

Commit

Permalink
Merge pull request IBM#2339 from Jacob-bzx/Jacob-bzx/support-multiple…
Browse files Browse the repository at this point in the history
…-balance-strategies

feat(consumer): support multiple balance strategies
  • Loading branch information
dnwe authored and remicalixte committed Dec 1, 2023
1 parent f75a462 commit 5eb6beb
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 24 deletions.
27 changes: 24 additions & 3 deletions config.go
Expand Up @@ -270,7 +270,16 @@ type Config struct {
}
Rebalance struct {
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
// Deprecated: Strategy exists for historical compatibility
// and should not be used. Please use GroupStrategies.
Strategy BalanceStrategy

// GroupStrategies is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
// default: [BalanceStrategyRange]
GroupStrategies []BalanceStrategy

// The maximum allowed time for each worker to join the group once a rebalance has begun.
// This is basically a limit on the amount of time needed for all tasks to flush any pending
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
Expand Down Expand Up @@ -498,7 +507,7 @@ func NewConfig() *Config {

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange}
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
Expand Down Expand Up @@ -737,6 +746,10 @@ func (c *Config) Validate() error {
Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
}
if c.Consumer.Group.Rebalance.Strategy != nil {
Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
}

// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
Expand All @@ -751,8 +764,10 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
case c.Consumer.Group.Rebalance.Strategy == nil:
return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time")
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
case c.Consumer.Group.Rebalance.Retry.Max < 0:
Expand All @@ -761,6 +776,12 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}

for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
if strategy == nil {
return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
}
}

// validate misc shared values
switch {
case c.ChannelBufferSize < 0:
Expand Down
66 changes: 50 additions & 16 deletions consumer_group.go
Expand Up @@ -269,6 +269,17 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, join.Err
}

var strategy BalanceStrategy
var ok bool
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
if !ok {
// this case shouldn't happen in practice, since the leader will choose the protocol
// that all the members support
return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
}
}

// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
if join.LeaderId == join.MemberId {
Expand All @@ -277,14 +288,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, err
}

plan, err = c.balance(members)
plan, err = c.balance(strategy, members)
if err != nil {
return nil, err
}
}

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
syncGroupResponse, err := c.syncGroupRequest(coordinator, plan, join.GenerationId, strategy)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
Expand All @@ -295,37 +306,37 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}
return nil, err
}
if groupRequest.Err != ErrNoError {
if !errors.Is(syncGroupResponse.Err, ErrNoError) {
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
}

switch groupRequest.Err {
switch syncGroupResponse.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if len(syncGroupResponse.MemberAssignment) > 0 {
members, err := syncGroupResponse.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -364,21 +375,45 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
Topics: topics,
UserData: c.userData,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
var strategy BalanceStrategy
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
} else {
for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
}
}

return coordinator.JoinGroup(req)
}

func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
// findStrategy returns the BalanceStrategy with the specified protocolName
// from the slice provided.
func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
for _, strategy := range groupStrategies {
if strategy.Name() == name {
return strategy, true
}
}
return nil, false
}

func (c *consumerGroup) syncGroupRequest(
coordinator *Broker,
plan BalanceStrategyPlan,
generationID int32,
strategy BalanceStrategy,
) (*SyncGroupResponse, error) {
req := &SyncGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy

for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
Expand All @@ -403,7 +438,7 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
return coordinator.Heartbeat(req)
}

func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
topics := make(map[string][]int32)
for _, meta := range members {
for _, topic := range meta.Topics {
Expand All @@ -419,7 +454,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
topics[topic] = partitions
}

strategy := c.config.Consumer.Group.Rebalance.Strategy
return strategy.Plan(members, topics)
}

Expand Down
6 changes: 3 additions & 3 deletions examples/consumergroup/main.go
Expand Up @@ -68,11 +68,11 @@ func main() {

switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
Expand Down
4 changes: 2 additions & 2 deletions functional_consumer_group_test.go
Expand Up @@ -58,7 +58,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m1s := newTestStatefulStrategy(t)
config := defaultConfig("M1")
config.Consumer.Group.Rebalance.Strategy = m1s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m1s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M1
Expand All @@ -71,7 +71,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m2s := newTestStatefulStrategy(t)
config = defaultConfig("M2")
config.Consumer.Group.Rebalance.Strategy = m2s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m2s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M2
Expand Down

0 comments on commit 5eb6beb

Please sign in to comment.