From d66826a07ad05307df603d525f8dbf11875222c0 Mon Sep 17 00:00:00 2001 From: Jacob-bzx Date: Thu, 15 Sep 2022 00:50:46 +0800 Subject: [PATCH] add: support a group of balance strategies --- config.go | 28 +++++++++++++-- consumer_group.go | 60 +++++++++++++++++++++++-------- consumer_group_test.go | 2 +- examples/consumergroup/main.go | 6 ++-- functional_consumer_group_test.go | 4 +-- 5 files changed, 76 insertions(+), 24 deletions(-) diff --git a/config.go b/config.go index f6f14f5fc..2bf5a8f14 100644 --- a/config.go +++ b/config.go @@ -273,7 +273,16 @@ type Config struct { } Rebalance struct { // Strategy for allocating topic partitions to members (default BalanceStrategyRange) + // Deprecated: Strategy exists for historical compatibility + // and should not be used. Please use GroupStrategies. Strategy BalanceStrategy + + // GroupStrategies is the priority-ordered list of client-side consumer group + // balancing strategies that will be offered to the coordinator. The first + // strategy that all group members support will be chosen by the leader. + // default: [BalanceStrategyRange] + GroupStrategies []BalanceStrategy + // The maximum allowed time for each worker to join the group once a rebalance has begun. // This is basically a limit on the amount of time needed for all tasks to flush any pending // data and commit offsets. If the timeout is exceeded, then the worker will be removed from @@ -504,7 +513,7 @@ func NewConfig() *Config { c.Consumer.Group.Session.Timeout = 10 * time.Second c.Consumer.Group.Heartbeat.Interval = 3 * time.Second - c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange + c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange} c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second @@ -745,6 +754,10 @@ func (c *Config) Validate() error { Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" + " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored") } + if c.Consumer.Group.Rebalance.Strategy != nil { + Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" + + " and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies") + } // validate IsolationLevel if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) { @@ -759,8 +772,10 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms") case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout: return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout") - case c.Consumer.Group.Rebalance.Strategy == nil: - return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty") + case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0: + return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty") + case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0: + return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time") case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond: return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms") case c.Consumer.Group.Rebalance.Retry.Max < 0: @@ -768,6 +783,13 @@ func (c *Config) Validate() error { case c.Consumer.Group.Rebalance.Retry.Backoff < 0: return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0") } + + for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies { + if strategy == nil { + return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty") + } + } + if c.Consumer.Group.InstanceId != "" { if !c.Version.IsAtLeast(V2_3_0_0) { return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3") diff --git a/consumer_group.go b/consumer_group.go index 7d755eae4..4b3174f44 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -319,6 +319,17 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, join.Err } + var strategy BalanceStrategy + var ok bool + if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil { + strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies) + if !ok { + // this case shouldn't happen in practice, since the leader will choose the protocol + // that all the members support + return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol) + } + } + // Prepare distribution plan if we joined as the leader var plan BalanceStrategyPlan var members map[string]ConsumerGroupMemberMetadata @@ -328,14 +339,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, err } - plan, err = c.balance(members) + plan, err = c.balance(strategy, members) if err != nil { return nil, err } } // Sync consumer group - groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId) + syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy) if consumerGroupSyncTotal != nil { consumerGroupSyncTotal.Inc(1) } @@ -346,13 +357,13 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler } return nil, err } - if !errors.Is(groupRequest.Err, ErrNoError) { + if !errors.Is(syncGroupResponse.Err, ErrNoError) { if consumerGroupSyncFailed != nil { consumerGroupSyncFailed.Inc(1) } } - switch groupRequest.Err { + switch syncGroupResponse.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately @@ -361,22 +372,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress: // retry after backoff if retries <= 0 { - return nil, groupRequest.Err + return nil, syncGroupResponse.Err } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrFencedInstancedId: if c.groupInstanceId != nil { Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId) } - return nil, groupRequest.Err + return nil, syncGroupResponse.Err default: - return nil, groupRequest.Err + return nil, syncGroupResponse.Err } // Retrieve and sort claims var claims map[string][]int32 - if len(groupRequest.MemberAssignment) > 0 { - members, err := groupRequest.GetMemberAssignment() + if len(syncGroupResponse.MemberAssignment) > 0 { + members, err := syncGroupResponse.GetMemberAssignment() if err != nil { return nil, err } @@ -419,26 +430,46 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( Topics: topics, UserData: c.userData, } - strategy := c.config.Consumer.Group.Rebalance.Strategy - if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { - return nil, err + var strategy BalanceStrategy + if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { + if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + return nil, err + } + } else { + for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies { + if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + return nil, err + } + } } return coordinator.JoinGroup(req) } +// findStrategy returns the BalanceStrategy with the specified protocolName +// from the slice provided. +func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) { + for _, strategy := range groupStrategies { + if strategy.Name() == name { + return strategy, true + } + } + return nil, false +} + func (c *consumerGroup) syncGroupRequest( coordinator *Broker, members map[string]ConsumerGroupMemberMetadata, plan BalanceStrategyPlan, generationID int32, + strategy BalanceStrategy, ) (*SyncGroupResponse, error) { req := &SyncGroupRequest{ GroupId: c.groupID, MemberId: c.memberID, GenerationId: generationID, } - strategy := c.config.Consumer.Group.Rebalance.Strategy + if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 3 } @@ -481,7 +512,7 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g return coordinator.Heartbeat(req) } -func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { +func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { topics := make(map[string][]int32) for _, meta := range members { for _, topic := range meta.Topics { @@ -497,7 +528,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) topics[topic] = partitions } - strategy := c.config.Consumer.Group.Rebalance.Strategy return strategy.Plan(members, topics) } diff --git a/consumer_group_test.go b/consumer_group_test.go index d7bb17536..7a69da382 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -49,7 +49,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) { "HeartbeatRequest": NewMockHeartbeatResponse(t), "JoinGroupRequest": NewMockSequence( NewMockJoinGroupResponse(t).SetError(ErrOffsetsLoadInProgress), - NewMockJoinGroupResponse(t), + NewMockJoinGroupResponse(t).SetGroupProtocol(RangeBalanceStrategyName), ), "SyncGroupRequest": NewMockSequence( NewMockSyncGroupResponse(t).SetError(ErrOffsetsLoadInProgress), diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 88edd738c..d7d2f916b 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -70,11 +70,11 @@ func main() { switch assignor { case "sticky": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} case "roundrobin": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} case "range": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 59f31939c..6be383f0c 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -59,7 +59,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) { m1s := newTestStatefulStrategy(t) config := defaultConfig("M1") - config.Consumer.Group.Rebalance.Strategy = m1s + config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m1s} config.Consumer.Group.Member.UserData = []byte(config.ClientID) // start M1 @@ -72,7 +72,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) { m2s := newTestStatefulStrategy(t) config = defaultConfig("M2") - config.Consumer.Group.Rebalance.Strategy = m2s + config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m2s} config.Consumer.Group.Member.UserData = []byte(config.ClientID) // start M2