Skip to content

Commit

Permalink
fix(proto): decode ConsumerGroupMetadata in cooperative sticky balanc…
Browse files Browse the repository at this point in the history
…e strategy

Signed-off-by: napallday <bzx0619@gmail.com>
  • Loading branch information
napallday committed Aug 23, 2023
1 parent 29eaac3 commit 5214325
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 24 deletions.
70 changes: 57 additions & 13 deletions balance_strategy.go
Expand Up @@ -155,9 +155,11 @@ func NewBalanceStrategySticky() BalanceStrategy {
var BalanceStrategySticky = NewBalanceStrategySticky()

func NewBalanceStrategyCooperativeSticky() BalanceStrategy {
return &cooperativeStickyBalanceStrategy{
cs := &cooperativeStickyBalanceStrategy{
stickyBalanceStrategy: &stickyBalanceStrategy{},
}
cs.stickyBalanceStrategy.parent = cs
return cs
}

// --------------------------------------------------------------------
Expand Down Expand Up @@ -216,6 +218,7 @@ func (s *balanceStrategy) SupportedProtocols() RebalanceProtocolSlice {

type stickyBalanceStrategy struct {
movements partitionMovements
parent *cooperativeStickyBalanceStrategy
}

// Name implements BalanceStrategy.
Expand All @@ -230,7 +233,7 @@ func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetad
}

// prepopulate the current assignment state from userdata on the consumer group members
currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
currentAssignment, prevAssignment, err := s.prepopulateCurrentAssignments(members)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -456,7 +459,9 @@ func computePartitionsTransferringOwnership(members map[string]ConsumerGroupMemb
}

func (cs *cooperativeStickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return cs.stickyBalanceStrategy.AssignmentData(memberID, topics, generationID)
return encode(&CooperativeStickyAssignorUserDataV0{
Generation: generationID,
}, nil)
}

func (cs *cooperativeStickyBalanceStrategy) SupportedProtocols() RebalanceProtocolSlice {
Expand Down Expand Up @@ -747,7 +752,7 @@ func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscripti
}

// Deserialize topic partition assignment data to aid with creation of a sticky assignment.
func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
func deserializeStickyUserData(userDataBytes []byte) (MemberData, error) {
userDataV1 := &StickyAssignorUserDataV1{}
if err := decode(userDataBytes, userDataV1, nil); err != nil {
userDataV0 := &StickyAssignorUserDataV0{}
Expand All @@ -759,6 +764,37 @@ func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUs
return userDataV1, nil
}

func deserializeCooperativeStickyMemberData(metadata ConsumerGroupMemberMetadata) (MemberData, error) {
// metadata.OwnedPartitions to []topicPartitionAssignment
var partitions []topicPartitionAssignment
for _, ownedPartitions := range metadata.OwnedPartitions {
for _, ownedPartition := range ownedPartitions.Partitions {
partitions = append(partitions, topicPartitionAssignment{Topic: ownedPartitions.Topic, Partition: ownedPartition})
}
}

if metadata.Version >= 2 {
return &CooperativeStickyMemberData{
PartitionsAssignments: partitions,
Generation: metadata.GenerationID,
}, nil
}

userDataV0 := &CooperativeStickyAssignorUserDataV0{}
if err := decode(metadata.UserData, userDataV0, nil); err != nil {
return nil, err
}

memberData := &CooperativeStickyMemberData{
PartitionsAssignments: partitions,
Generation: userDataV0.Generation,
}
if metadata.RackID != nil {
memberData.RackID = *metadata.RackID
}
return memberData, nil
}

// filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
// to those topic partitions currently reported by the Kafka cluster.
func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
Expand Down Expand Up @@ -960,38 +996,46 @@ func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitio
// We need to process subscriptions' user data with each consumer's reported generation in mind
// higher generations overwrite lower generations in case of a conflict
// note that a conflict could exist only if user data is for different generations
func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
func (s *stickyBalanceStrategy) prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
currentAssignment := make(map[string][]topicPartitionAssignment)
prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)

var err error
var memberData MemberData
// for each partition we create a sorted map of its consumers by generation
sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
for memberID, meta := range members {
if len(meta.UserData) == 0 {
continue
}
consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
if s.parent != nil {
// cooperative sticky assignor
memberData, err = deserializeCooperativeStickyMemberData(meta)
} else {
// sticky assignor
memberData, err = deserializeStickyUserData(meta.UserData)
}
if err != nil {
return nil, nil, err
}
for _, partition := range consumerUserData.partitions() {
for _, partition := range memberData.partitions() {
if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
if consumerUserData.hasGeneration() {
if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
if memberData.hasGeneration() {
if _, generationExists := consumers[memberData.generation()]; generationExists {
// same partition is assigned to two consumers during the same rebalance.
// log a warning and skip this record
Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, memberData.generation())
continue
} else {
consumers[consumerUserData.generation()] = memberID
consumers[memberData.generation()] = memberID
}
} else {
consumers[defaultGeneration] = memberID
}
} else {
generation := defaultGeneration
if consumerUserData.hasGeneration() {
generation = consumerUserData.generation()
if memberData.hasGeneration() {
generation = memberData.generation()
}
sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
}
Expand Down
19 changes: 12 additions & 7 deletions balance_strategy_test.go
Expand Up @@ -197,14 +197,14 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
}
}

func Test_deserializeTopicPartitionAssignment(t *testing.T) {
func Test_deserializeStickyUserData(t *testing.T) {
type args struct {
userDataBytes []byte
}
tests := []struct {
name string
args args
want StickyAssignorUserData
want MemberData
wantErr bool
}{
{
Expand Down Expand Up @@ -271,18 +271,22 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
got, err := deserializeTopicPartitionAssignment(tt.args.userDataBytes)
got, err := deserializeStickyUserData(tt.args.userDataBytes)
if (err != nil) != tt.wantErr {
t.Errorf("deserializeTopicPartitionAssignment() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("deserializeStickyUserData() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("deserializeTopicPartitionAssignment() = %v, want %v", got, tt.want)
t.Errorf("deserializeStickyUserData() = %v, want %v", got, tt.want)
}
})
}
}

func Test_deserializeCooperativeStickyUserData(t *testing.T) {
// todo: add unit test
}

func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
strategy := NewBalanceStrategyRoundRobin()

Expand Down Expand Up @@ -443,14 +447,15 @@ func Test_prepopulateCurrentAssignments(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
_, gotPrevAssignments, err := prepopulateCurrentAssignments(tt.args.members)
s := &stickyBalanceStrategy{}
_, gotPrevAssignments, err := s.prepopulateCurrentAssignments(tt.args.members)

if (err != nil) != tt.wantErr {
t.Errorf("prepopulateCurrentAssignments() error = %v, wantErr %v", err, tt.wantErr)
}

if !reflect.DeepEqual(gotPrevAssignments, tt.wantPrevAssignments) {
t.Errorf("deserializeTopicPartitionAssignment() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
t.Errorf("deserializeStickyUserData() prevAssignments = %v, want %v", gotPrevAssignments, tt.wantPrevAssignments)
}
})
}
Expand Down
7 changes: 4 additions & 3 deletions consumer_group.go
Expand Up @@ -819,9 +819,10 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
}

meta := &ConsumerGroupMemberMetadata{
Version: 1,
Topics: topics,
UserData: c.userData,
Version: 2,
Topics: topics,
UserData: c.userData,
GenerationID: c.generationID,
}

for topic, partitions := range c.ownedPartitions {
Expand Down
31 changes: 31 additions & 0 deletions cooperative_sticky_assignor_user_data.go
@@ -0,0 +1,31 @@
package sarama

type CooperativeStickyAssignorUserDataV0 struct {
Generation int32
}

func (m *CooperativeStickyAssignorUserDataV0) encode(pe packetEncoder) error {
pe.putInt32(m.Generation)
return nil
}

func (m *CooperativeStickyAssignorUserDataV0) decode(pd packetDecoder) (err error) {
if m.Generation, err = pd.getInt32(); err != nil {
return
}
return nil
}

type CooperativeStickyMemberData struct {
PartitionsAssignments []topicPartitionAssignment
Generation int32
RackID string
}

func (m *CooperativeStickyMemberData) partitions() []topicPartitionAssignment {
return m.PartitionsAssignments
}
func (m *CooperativeStickyMemberData) hasGeneration() bool { return true }
func (m *CooperativeStickyMemberData) generation() int { return int(m.Generation) }

var _ MemberData = (*CooperativeStickyMemberData)(nil)
12 changes: 12 additions & 0 deletions cooperative_sticky_assignor_user_data_test.go
@@ -0,0 +1,12 @@
package sarama

import (
"testing"
)

func TestCooperativeStickyAssignorUserDataV0(t *testing.T) {
req := &CooperativeStickyAssignorUserDataV0{}
data := decodeUserDataBytes(t, "/////w==") // 0xff 0xff 0xff 0xff
testDecodable(t, "", req, data)
testEncodable(t, "", req, data)
}
2 changes: 1 addition & 1 deletion sticky_assignor_user_data.go
Expand Up @@ -5,7 +5,7 @@ type topicPartitionAssignment struct {
Partition int32
}

type StickyAssignorUserData interface {
type MemberData interface {
partitions() []topicPartitionAssignment
hasGeneration() bool
generation() int
Expand Down

0 comments on commit 5214325

Please sign in to comment.