Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consumer): support multiple balance strategies #2339

Merged
merged 1 commit into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 25 additions & 3 deletions config.go
Expand Up @@ -273,7 +273,16 @@ type Config struct {
}
Rebalance struct {
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
// Deprecated: Strategy exists for historical compatibility
// and should not be used. Please use GroupStrategies.
Strategy BalanceStrategy

// GroupStrategies is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
// default: [BalanceStrategyRange]
GroupStrategies []BalanceStrategy

// The maximum allowed time for each worker to join the group once a rebalance has begun.
// This is basically a limit on the amount of time needed for all tasks to flush any pending
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
Expand Down Expand Up @@ -504,7 +513,7 @@ func NewConfig() *Config {

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange}
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
Expand Down Expand Up @@ -745,6 +754,10 @@ func (c *Config) Validate() error {
Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
}
if c.Consumer.Group.Rebalance.Strategy != nil {
Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
}

// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
Expand All @@ -759,15 +772,24 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
case c.Consumer.Group.Rebalance.Strategy == nil:
return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time")
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
case c.Consumer.Group.Rebalance.Retry.Max < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}

for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
if strategy == nil {
return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
}
}

if c.Consumer.Group.InstanceId != "" {
if !c.Version.IsAtLeast(V2_3_0_0) {
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
Expand Down
60 changes: 45 additions & 15 deletions consumer_group.go
Expand Up @@ -319,6 +319,17 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, join.Err
}

var strategy BalanceStrategy
var ok bool
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
if !ok {
// this case shouldn't happen in practice, since the leader will choose the protocol
// that all the members support
return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
}
}

// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
var members map[string]ConsumerGroupMemberMetadata
Expand All @@ -328,14 +339,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, err
}

plan, err = c.balance(members)
plan, err = c.balance(strategy, members)
if err != nil {
return nil, err
}
}

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId)
syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
Expand All @@ -346,13 +357,13 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}
return nil, err
}
if !errors.Is(groupRequest.Err, ErrNoError) {
if !errors.Is(syncGroupResponse.Err, ErrNoError) {
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
}

switch groupRequest.Err {
switch syncGroupResponse.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
Expand All @@ -361,22 +372,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
default:
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if len(syncGroupResponse.MemberAssignment) > 0 {
members, err := syncGroupResponse.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -419,26 +430,46 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
Topics: topics,
UserData: c.userData,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
var strategy BalanceStrategy
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
} else {
for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
}
}

return coordinator.JoinGroup(req)
}

// findStrategy returns the BalanceStrategy with the specified protocolName
// from the slice provided.
func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
for _, strategy := range groupStrategies {
if strategy.Name() == name {
return strategy, true
}
}
return nil, false
}

func (c *consumerGroup) syncGroupRequest(
coordinator *Broker,
members map[string]ConsumerGroupMemberMetadata,
plan BalanceStrategyPlan,
generationID int32,
strategy BalanceStrategy,
) (*SyncGroupResponse, error) {
req := &SyncGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy

if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
}
Expand Down Expand Up @@ -481,7 +512,7 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
return coordinator.Heartbeat(req)
}

func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
topics := make(map[string][]int32)
for _, meta := range members {
for _, topic := range meta.Topics {
Expand All @@ -497,7 +528,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
topics[topic] = partitions
}

strategy := c.config.Consumer.Group.Rebalance.Strategy
return strategy.Plan(members, topics)
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_group_test.go
Expand Up @@ -49,7 +49,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
"HeartbeatRequest": NewMockHeartbeatResponse(t),
"JoinGroupRequest": NewMockSequence(
NewMockJoinGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockJoinGroupResponse(t),
NewMockJoinGroupResponse(t).SetGroupProtocol(RangeBalanceStrategyName),
),
"SyncGroupRequest": NewMockSequence(
NewMockSyncGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
Expand Down
6 changes: 3 additions & 3 deletions examples/consumergroup/main.go
Expand Up @@ -70,11 +70,11 @@ func main() {

switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
Expand Down
4 changes: 2 additions & 2 deletions functional_consumer_group_test.go
Expand Up @@ -59,7 +59,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m1s := newTestStatefulStrategy(t)
config := defaultConfig("M1")
config.Consumer.Group.Rebalance.Strategy = m1s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m1s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M1
Expand All @@ -72,7 +72,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m2s := newTestStatefulStrategy(t)
config = defaultConfig("M2")
config.Consumer.Group.Rebalance.Strategy = m2s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m2s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M2
Expand Down