Skip to content

Commit

Permalink
fix: comments and logs
Browse files Browse the repository at this point in the history
Signed-off-by: napallday <bzx0619@gmail.com>
  • Loading branch information
napallday committed Aug 22, 2023
1 parent ada6a47 commit 29eaac3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
16 changes: 11 additions & 5 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,12 @@ func (c *consumerGroup) start(ctx context.Context, topics []string, handlerV2 Co
newAssignedPartitions := diffAssignment(claims, c.ownedPartitions)
if c.protocol == COOPERATIVE {
revokedPartitions := diffAssignment(c.ownedPartitions, claims)
Logger.Printf("updating consumer(group:%s, member:%s, generation:%d)\n"+
Logger.Printf("updating consumer(group:%s, member:%s, generation:%d, isLeader:%v)\n"+
"All Assignments: %v\n"+
"New Partitions: %v\n"+
"Revoked Partitions: %v\n",
c.groupID, c.memberID, c.generationID, claims, newAssignedPartitions, revokedPartitions)
c.groupID, c.memberID, c.generationID, c.isLeader,
claims, newAssignedPartitions, revokedPartitions)

if len(revokedPartitions) > 0 {
err = c.revokedPartitions(revokedPartitions)
Expand Down Expand Up @@ -410,7 +411,8 @@ func (c *consumerGroup) start(ctx context.Context, topics []string, handlerV2 Co
cancelFunc()
case <-ctx.Done():
}
Logger.Printf("consumer(group:%s, member:%s, generation:%d) context is done", c.groupID, c.memberID, c.generationID)
Logger.Printf("consumer(group:%s, member:%s, generation:%d, isLeader:%v) context is done\n",
c.groupID, c.memberID, c.generationID, c.isLeader)

// if using EAGER rebalance protocol, we need to revoke all owned partitions before sending new JoinGroupRequest
if c.protocol == EAGER {
Expand Down Expand Up @@ -1095,7 +1097,8 @@ func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int
}

func (c *consumerGroup) revokedPartitions(revokedPartitions map[string][]int32) error {
Logger.Printf("revoking partitions: %v", c.groupID, revokedPartitions)
Logger.Printf("consumer(group:%s, member:%s, generation:%d, isLeader:%v) revoking partitions: %v\n",
c.groupID, c.memberID, c.generationID, c.isLeader, revokedPartitions)

// close revoked partition consumers
c.removeClaims(revokedPartitions)
Expand All @@ -1121,13 +1124,15 @@ func (c *consumerGroup) revokedOwnedPartitions() {
}

func (c *consumerGroup) startNewPartitions(newAssignedPartitions map[string][]int32) error {
Logger.Printf("consumer(group:%s, member:%s, generation:%d, isLeader:%v) starting new assigned partitions: %v",
c.groupID, c.memberID, c.generationID, c.isLeader, newAssignedPartitions)

// create partition offset managers for each new assigned partitions
for topic, partitions := range newAssignedPartitions {
for _, partition := range partitions {
pom, err := c.offsetManager.ManagePartition(topic, partition)
if err != nil {
Logger.Printf("unable to create partition offset manager for %s/%d, err: %v", topic, partition, err)
// todo maybe close all offset managers here
return err
}

Expand Down Expand Up @@ -1562,6 +1567,7 @@ type ConsumerGroupHandlerV2 interface {
// Setup runs at the beginning of setting up new assigned partitions, before ConsumeClaim.
// For EAGER rebalance strategy, this is to set up all assigned partitions.
// For COOPERATIVE rebalance strategy, this is only to set up new assigned partitions.
// Note that even if there are no new assigned partitions, this method will still be called after rebalance.
Setup(offsetManger OffsetManager, newAssignedPartitions map[string][]int32)

// Cleanup runs after ConsumeClaim, but before the offsets are committed for the claim.
Expand Down
7 changes: 3 additions & 4 deletions examples/consumergroup_cooperative/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ func init() {
flag.StringVar(&assignors, "assignors", "cooperative-sticky", "Consumer group partition assignment strategies (range, roundrobin, sticky, cooperative-sticky)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
// flag string slice
// https://stackoverflow.com/questions/1752414/how-to-receive-variable-number-of-arguments-as-a-command-line-flag

flag.Parse()

Expand Down Expand Up @@ -170,15 +168,16 @@ type Consumer struct{}
// Setup runs at the beginning of setting up new assigned partitions, before ConsumeClaim.
// For EAGER rebalance strategy, this is to set up all assigned partitions.
// For COOPERATIVE rebalance strategy, this is only to set up new assigned partitions.
// Note that even if there are no new assigned partitions, this method will still be called after rebalance.
func (consumer *Consumer) Setup(offsetManger sarama.OffsetManager, newAssignedPartitions map[string][]int32) {
log.Printf("newAssignedPartitions: %v", newAssignedPartitions)
log.Printf("[Setup] newAssignedPartitions: %v", newAssignedPartitions)
}

// Cleanup runs after ConsumeClaim, but before the offsets are committed for the claim.
// For EAGER rebalance strategy, this is to clean up all assigned partitions.
// For COOPERATIVE rebalance strategy, this is only to clean up revoked partitions.
func (consumer *Consumer) Cleanup(offsetManger sarama.OffsetManager, revokedPartitions map[string][]int32) {
log.Printf("revokedPartitions: %v", revokedPartitions)
log.Printf("[Cleanup] revokedPartitions: %v", revokedPartitions)
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
Expand Down

0 comments on commit 29eaac3

Please sign in to comment.