diff --git a/balance_strategy.go b/balance_strategy.go index eab26b2b6..84f28392e 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -155,9 +155,11 @@ func NewBalanceStrategySticky() BalanceStrategy { var BalanceStrategySticky = NewBalanceStrategySticky() func NewBalanceStrategyCooperativeSticky() BalanceStrategy { - return &cooperativeStickyBalanceStrategy{ + cs := &cooperativeStickyBalanceStrategy{ stickyBalanceStrategy: &stickyBalanceStrategy{}, } + cs.stickyBalanceStrategy.parent = cs + return cs } // -------------------------------------------------------------------- @@ -216,6 +218,7 @@ func (s *balanceStrategy) SupportedProtocols() RebalanceProtocolSlice { type stickyBalanceStrategy struct { movements partitionMovements + parent *cooperativeStickyBalanceStrategy } // Name implements BalanceStrategy. @@ -230,7 +233,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad } // prepopulate the current assignment state from userdata on the consumer group members - currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members) + currentAssignment, prevAssignment, err := s.prepopulateCurrentAssignments(members) if err != nil { return nil, err } @@ -456,7 +459,9 @@ func computePartitionsTransferringOwnership(members map[string]ConsumerGroupMemb } func (cs *cooperativeStickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { - return cs.stickyBalanceStrategy.AssignmentData(memberID, topics, generationID) + return encode(&CooperativeStickyAssignorUserDataV0{ + Generation: generationID, + }, nil) } func (cs *cooperativeStickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice { @@ -747,7 +752,7 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti } // Deserialize topic partition assignment data to aid with creation of a sticky assignment. -func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) { +func deserializeStickyUserData(userDataBytes []byte) (MemberData, error) { userDataV1 := &StickyAssignorUserDataV1{} if err := decode(userDataBytes, userDataV1, nil); err != nil { userDataV0 := &StickyAssignorUserDataV0{} @@ -759,6 +764,37 @@ func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUs return userDataV1, nil } +func deserializeCooperativeStickyMemberData(metadata ConsumerGroupMemberMetadata) (MemberData, error) { + // metadata.OwnedPartitions to []topicPartitionAssignment + var partitions []topicPartitionAssignment + for _, ownedPartitions := range metadata.OwnedPartitions { + for _, ownedPartition := range ownedPartitions.Partitions { + partitions = append(partitions, topicPartitionAssignment{Topic: ownedPartitions.Topic, Partition: ownedPartition}) + } + } + + if metadata.Version >= 2 { + return &CooperativeStickyMemberData{ + PartitionsAssignments: partitions, + Generation: metadata.GenerationID, + }, nil + } + + userDataV0 := &CooperativeStickyAssignorUserDataV0{} + if err := decode(metadata.UserData, userDataV0, nil); err != nil { + return nil, err + } + + memberData := &CooperativeStickyMemberData{ + PartitionsAssignments: partitions, + Generation: userDataV0.Generation, + } + if metadata.RackID != nil { + memberData.RackID = *metadata.RackID + } + return memberData, nil +} + // filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited // to those topic partitions currently reported by the Kafka cluster. func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment { @@ -960,38 +996,46 @@ func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitio // We need to process subscriptions' user data with each consumer's reported generation in mind // higher generations overwrite lower generations in case of a conflict // note that a conflict could exist only if user data is for different generations -func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) { +func (s *stickyBalanceStrategy) prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) { currentAssignment := make(map[string][]topicPartitionAssignment) prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair) + var err error + var memberData MemberData // for each partition we create a sorted map of its consumers by generation sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string) for memberID, meta := range members { if len(meta.UserData) == 0 { continue } - consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData) + if s.parent != nil { + // cooperative sticky assignor + memberData, err = deserializeCooperativeStickyMemberData(meta) + } else { + // sticky assignor + memberData, err = deserializeStickyUserData(meta.UserData) + } if err != nil { return nil, nil, err } - for _, partition := range consumerUserData.partitions() { + for _, partition := range memberData.partitions() { if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists { - if consumerUserData.hasGeneration() { - if _, generationExists := consumers[consumerUserData.generation()]; generationExists { + if memberData.hasGeneration() { + if _, generationExists := consumers[memberData.generation()]; generationExists { // same partition is assigned to two consumers during the same rebalance. // log a warning and skip this record - Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation()) + Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, memberData.generation()) continue } else { - consumers[consumerUserData.generation()] = memberID + consumers[memberData.generation()] = memberID } } else { consumers[defaultGeneration] = memberID } } else { generation := defaultGeneration - if consumerUserData.hasGeneration() { - generation = consumerUserData.generation() + if memberData.hasGeneration() { + generation = memberData.generation() } sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID} } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index f397a9073..ab3e10287 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -197,14 +197,14 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { } } -func Test_deserializeTopicPartitionAssignment(t *testing.T) { +func Test_deserializeStickyUserData(t *testing.T) { type args struct { userDataBytes []byte } tests := []struct { name string args args - want StickyAssignorUserData + want MemberData wantErr bool }{ { @@ -271,18 +271,22 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes) + got, err := deserializeStickyUserData(tt.args.userDataBytes) if (err != nil) != tt.wantErr { - t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("deserializeStickyUserData() error = %v, wantErr %v", err, tt.wantErr) return } if !reflect.DeepEqual(got, tt.want) { - t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want) + t.Errorf("deserializeStickyUserData() = %v, want %v", got, tt.want) } }) } } +func Test_deserializeCooperativeStickyUserData(t *testing.T) { + // todo: add unit test +} + func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) { strategy := NewBalanceStrategyRoundRobin() @@ -443,14 +447,15 @@ func Test_prepopulateCurrentAssignments(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - _, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members) + s := &stickyBalanceStrategy{} + _, gotPrevAssignments, err := s.prepopulateCurrentAssignments(tt.args.members) if (err != nil) != tt.wantErr { t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr) } if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) { - t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments) + t.Errorf("deserializeStickyUserData() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments) } }) } diff --git a/consumer_group.go b/consumer_group.go index 476d7d842..82f6eea5e 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -819,9 +819,10 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( } meta := &ConsumerGroupMemberMetadata{ - Version: 1, - Topics: topics, - UserData: c.userData, + Version: 2, + Topics: topics, + UserData: c.userData, + GenerationID: c.generationID, } for topic, partitions := range c.ownedPartitions { diff --git a/cooperative_sticky_assignor_user_data.go b/cooperative_sticky_assignor_user_data.go new file mode 100644 index 000000000..7cf2ccb02 --- /dev/null +++ b/cooperative_sticky_assignor_user_data.go @@ -0,0 +1,31 @@ +package sarama + +type CooperativeStickyAssignorUserDataV0 struct { + Generation int32 +} + +func (m *CooperativeStickyAssignorUserDataV0) encode(pe packetEncoder) error { + pe.putInt32(m.Generation) + return nil +} + +func (m *CooperativeStickyAssignorUserDataV0) decode(pd packetDecoder) (err error) { + if m.Generation, err = pd.getInt32(); err != nil { + return + } + return nil +} + +type CooperativeStickyMemberData struct { + PartitionsAssignments []topicPartitionAssignment + Generation int32 + RackID string +} + +func (m *CooperativeStickyMemberData) partitions() []topicPartitionAssignment { + return m.PartitionsAssignments +} +func (m *CooperativeStickyMemberData) hasGeneration() bool { return true } +func (m *CooperativeStickyMemberData) generation() int { return int(m.Generation) } + +var _ MemberData = (*CooperativeStickyMemberData)(nil) diff --git a/cooperative_sticky_assignor_user_data_test.go b/cooperative_sticky_assignor_user_data_test.go new file mode 100644 index 000000000..b87ec98e2 --- /dev/null +++ b/cooperative_sticky_assignor_user_data_test.go @@ -0,0 +1,12 @@ +package sarama + +import ( + "testing" +) + +func TestCooperativeStickyAssignorUserDataV0(t *testing.T) { + req := &CooperativeStickyAssignorUserDataV0{} + data := decodeUserDataBytes(t, "/////w==") // 0xff 0xff 0xff 0xff + testDecodable(t, "", req, data) + testEncodable(t, "", req, data) +} diff --git a/sticky_assignor_user_data.go b/sticky_assignor_user_data.go index 161233fc3..8df6978d9 100644 --- a/sticky_assignor_user_data.go +++ b/sticky_assignor_user_data.go @@ -5,7 +5,7 @@ type topicPartitionAssignment struct { Partition int32 } -type StickyAssignorUserData interface { +type MemberData interface { partitions() []topicPartitionAssignment hasGeneration() bool generation() int