From 5eb6bebb154cf751ab391800e38fa5ba361a7271 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 27 Sep 2022 12:04:54 +0100 Subject: [PATCH 1/3] Merge pull request #2339 from Jacob-bzx/Jacob-bzx/support-multiple-balance-strategies feat(consumer): support multiple balance strategies --- config.go | 27 +++++++++++-- consumer_group.go | 66 +++++++++++++++++++++++-------- examples/consumergroup/main.go | 6 +-- functional_consumer_group_test.go | 4 +- 4 files changed, 79 insertions(+), 24 deletions(-) diff --git a/config.go b/config.go index a2e55af04..3f7fa9daa 100644 --- a/config.go +++ b/config.go @@ -270,7 +270,16 @@ type Config struct { } Rebalance struct { // Strategy for allocating topic partitions to members (default BalanceStrategyRange) + // Deprecated: Strategy exists for historical compatibility + // and should not be used. Please use GroupStrategies. Strategy BalanceStrategy + + // GroupStrategies is the priority-ordered list of client-side consumer group + // balancing strategies that will be offered to the coordinator. The first + // strategy that all group members support will be chosen by the leader. + // default: [BalanceStrategyRange] + GroupStrategies []BalanceStrategy + // The maximum allowed time for each worker to join the group once a rebalance has begun. // This is basically a limit on the amount of time needed for all tasks to flush any pending // data and commit offsets. If the timeout is exceeded, then the worker will be removed from @@ -498,7 +507,7 @@ func NewConfig() *Config { c.Consumer.Group.Session.Timeout = 10 * time.Second c.Consumer.Group.Heartbeat.Interval = 3 * time.Second - c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange + c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange} c.Consumer.Group.Rebalance.Timeout = 60 * time.Second c.Consumer.Group.Rebalance.Retry.Max = 4 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second @@ -737,6 +746,10 @@ func (c *Config) Validate() error { Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" + " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored") } + if c.Consumer.Group.Rebalance.Strategy != nil { + Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" + + " and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies") + } // validate IsolationLevel if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) { @@ -751,8 +764,10 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms") case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout: return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout") - case c.Consumer.Group.Rebalance.Strategy == nil: - return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty") + case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0: + return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty") + case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0: + return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time") case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond: return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms") case c.Consumer.Group.Rebalance.Retry.Max < 0: @@ -761,6 +776,12 @@ func (c *Config) Validate() error { return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0") } + for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies { + if strategy == nil { + return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty") + } + } + // validate misc shared values switch { case c.ChannelBufferSize < 0: diff --git a/consumer_group.go b/consumer_group.go index 5a8d3072c..8e738e912 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -269,6 +269,17 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, join.Err } + var strategy BalanceStrategy + var ok bool + if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil { + strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies) + if !ok { + // this case shouldn't happen in practice, since the leader will choose the protocol + // that all the members support + return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol) + } + } + // Prepare distribution plan if we joined as the leader var plan BalanceStrategyPlan if join.LeaderId == join.MemberId { @@ -277,14 +288,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return nil, err } - plan, err = c.balance(members) + plan, err = c.balance(strategy, members) if err != nil { return nil, err } } // Sync consumer group - groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) + syncGroupResponse, err := c.syncGroupRequest(coordinator, plan, join.GenerationId, strategy) if consumerGroupSyncTotal != nil { consumerGroupSyncTotal.Inc(1) } @@ -295,37 +306,37 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler } return nil, err } - if groupRequest.Err != ErrNoError { + if !errors.Is(syncGroupResponse.Err, ErrNoError) { if consumerGroupSyncFailed != nil { consumerGroupSyncFailed.Inc(1) } } - switch groupRequest.Err { + switch syncGroupResponse.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately c.memberID = "" return c.newSession(ctx, topics, handler, retries) case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh if retries <= 0 { - return nil, groupRequest.Err + return nil, syncGroupResponse.Err } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrRebalanceInProgress: // retry after backoff if retries <= 0 { - return nil, groupRequest.Err + return nil, syncGroupResponse.Err } return c.retryNewSession(ctx, topics, handler, retries, false) default: - return nil, groupRequest.Err + return nil, syncGroupResponse.Err } // Retrieve and sort claims var claims map[string][]int32 - if len(groupRequest.MemberAssignment) > 0 { - members, err := groupRequest.GetMemberAssignment() + if len(syncGroupResponse.MemberAssignment) > 0 { + members, err := syncGroupResponse.GetMemberAssignment() if err != nil { return nil, err } @@ -364,21 +375,45 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( Topics: topics, UserData: c.userData, } - strategy := c.config.Consumer.Group.Rebalance.Strategy - if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { - return nil, err + var strategy BalanceStrategy + if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { + if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + return nil, err + } + } else { + for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies { + if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + return nil, err + } + } } return coordinator.JoinGroup(req) } -func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) { +// findStrategy returns the BalanceStrategy with the specified protocolName +// from the slice provided. +func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) { + for _, strategy := range groupStrategies { + if strategy.Name() == name { + return strategy, true + } + } + return nil, false +} + +func (c *consumerGroup) syncGroupRequest( + coordinator *Broker, + plan BalanceStrategyPlan, + generationID int32, + strategy BalanceStrategy, +) (*SyncGroupResponse, error) { req := &SyncGroupRequest{ GroupId: c.groupID, MemberId: c.memberID, GenerationId: generationID, } - strategy := c.config.Consumer.Group.Rebalance.Strategy + for memberID, topics := range plan { assignment := &ConsumerGroupMemberAssignment{Topics: topics} userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID) @@ -403,7 +438,7 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g return coordinator.Heartbeat(req) } -func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { +func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) { topics := make(map[string][]int32) for _, meta := range members { for _, topic := range meta.Topics { @@ -419,7 +454,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) topics[topic] = partitions } - strategy := c.config.Consumer.Group.Rebalance.Strategy return strategy.Plan(members, topics) } diff --git a/examples/consumergroup/main.go b/examples/consumergroup/main.go index 53ed143f6..fd0b47c34 100644 --- a/examples/consumergroup/main.go +++ b/examples/consumergroup/main.go @@ -68,11 +68,11 @@ func main() { switch assignor { case "sticky": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} case "roundrobin": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} case "range": - config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} default: log.Panicf("Unrecognized consumer group partition assignor: %s", assignor) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index 13ed2e935..c1e9809f0 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -58,7 +58,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) { m1s := newTestStatefulStrategy(t) config := defaultConfig("M1") - config.Consumer.Group.Rebalance.Strategy = m1s + config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m1s} config.Consumer.Group.Member.UserData = []byte(config.ClientID) // start M1 @@ -71,7 +71,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) { m2s := newTestStatefulStrategy(t) config = defaultConfig("M2") - config.Consumer.Group.Rebalance.Strategy = m2s + config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m2s} config.Consumer.Group.Member.UserData = []byte(config.ClientID) // start M2 From 90eea1a78a88d9bfd21b02449a9aaf7b8b191564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Calixte?= Date: Fri, 1 Dec 2023 12:24:58 +0100 Subject: [PATCH 2/3] bump go version --- .github/workflows/fuzz.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/fuzz.yml b/.github/workflows/fuzz.yml index a9197fc23..2cd77fb8d 100644 --- a/.github/workflows/fuzz.yml +++ b/.github/workflows/fuzz.yml @@ -20,7 +20,7 @@ jobs: - name: Setup Go uses: actions/setup-go@v2 with: - go-version: 1.17.x + go-version: 1.21.x - name: Get Go environment id: go-env From 79f41212be60ac4a12f395ede4e44aadb0bdc8ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Calixte?= Date: Fri, 1 Dec 2023 14:11:26 +0100 Subject: [PATCH 3/3] nolint:gocyclo for Config.Validate --- config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config.go b/config.go index 3f7fa9daa..340406a4d 100644 --- a/config.go +++ b/config.go @@ -524,6 +524,8 @@ func NewConfig() *Config { // Validate checks a Config instance. It will return a // ConfigurationError if the specified values don't make sense. +// +//nolint:gocyclo // This function's cyclomatic complexity has go beyond 100 func (c *Config) Validate() error { // some configuration values should be warned on but not fail completely, do those first if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {