Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin.AlterConsumerGroupOffsets not working for existing consumer group with multiple topics #1138

Open
mwufigma opened this issue Feb 20, 2024 · 8 comments
Labels

Comments

@mwufigma
Copy link

mwufigma commented Feb 20, 2024

I have a consumer group that contains several single-partition topics. There are times when I want to create a new consumer in this group, but have it read starting from a specific offset from about a day ago (our message retention supports this for sure). The approach I'm taking is creating a dummy consumer, calling admin, err := kafka.NewAdminClientFromConsumer(consumer) to create the admin instance and then calling resetResult, err := admin.AlterConsumerGroupOffsets(ctx, consumerGroupTopicPartitions) to reset to the desired offset prior to the creation of the consumer that needs to read from the specific offset. For some reason, this works just fine when the consumer group does not yet exist or does not have any consumers in it yet, but throws this error "[Broker: Unknown member]" if I attempt to do it when a consumer already exists. Here is a code snippet:

       consumer, err := connections.NewKafkaClient(groupID)
	if err != nil {
		log.Errorf("Failed to initiate Kafka client: %v", err)
		return err
	}
	err = consumer.Subscribe(topic, nil)
	if err != nil {
		log.Warnf("Failed to subscribe to topic [%s]: %v", topic, err)
	}
	defer consumer.Close()

	admin, err := kafka.NewAdminClientFromConsumer(consumer)
	if err != nil {
		log.Errorf("Failed to initiate Kafka admin client: %v", err)
		return err
	}
	defer admin.Close()

	ctx := context.Background()
	cgtps := make([]kafka.TopicPartition, 0, len(resetOffsets))
	for _, offset := range resetOffsets {
		cgtps = append(cgtps, kafka.TopicPartition{
			Topic:      &topic,
			Partition: offset.Partition,
			Offset:     offset.Offset,
		})
	}

	consumerGroupTopicPartitions := []kafka.ConsumerGroupTopicPartitions{
		{
			Group:      groupID,
			Partitions: cgtps,
		},
	}

	resetResult, err := admin.AlterConsumerGroupOffsets(ctx, consumerGroupTopicPartitions)

Some additional context is that the consumer group / consumers we want to be manipulating offset for are managed by MSK Connect.

@mwufigma
Copy link
Author

@emasab @milindl @vsantwana if anyone can give any suggestions on how to work around this!

@milindl
Copy link
Contributor

milindl commented Feb 20, 2024

If you are trying to modify the offsets for a topic T and consumer group G, it will not work if there are any active consumers inside G which are subscribed to T.
That seems to be similar to what you're describing in this situation, is that accurate?

@mwufigma
Copy link
Author

mwufigma commented Feb 20, 2024

If you are trying to modify the offsets for a topic T and consumer group G, it will not work if there are any active consumers inside G which are subscribed to T. That seems to be similar to what you're describing in this situation, is that accurate?

@milindl Hm that might be the case here, but previously I didn't subscribe the consumer to any topic T in consumer group G, tried to alter offsets for T in G and it didn't work, causing it to throw the same error. I subscribed the consumer to topic T because I read somewhere that "Committed offsets will be rejected for members not subscribed to the topic being committed to". And because I'm creating the admin client from the consumer (admin, err := kafka.NewAdminClientFromConsumer(consumer)), I hypothesized that the consumer that the admin client was initialized with needs to be subscribed to the topic.

To test something else out, I think I can follow the example here: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/admin_alter_consumer_group_offsets/admin_alter_consumer_group_offsets.go and create the admin client without a consumer and just with bootstrap servers. Not sure what difference that would make, if you could help shed some light on this.

@mwufigma
Copy link
Author

Edit: it still doesn't work with this snippet:

        // Create admin client without consumer
        admin, err := kafka.NewAdminClient(kafkaCfgMap)
	if err != nil {
		log.Errorf("Failed to initiate Kafka admin client: %v", err)
		return err
	}
	defer admin.Close()

	ctx := context.Background()
	cgtps := make([]kafka.TopicPartition, 0, len(resetOffsets))
	for _, offset := range resetOffsets {
		cgtps = append(cgtps, kafka.TopicPartition{
			Topic:      &topic,
			Partition: offset.Partition,
			Offset:     offset.Offset,
		})
	}

	consumerGroupTopicPartitions := []kafka.ConsumerGroupTopicPartitions{
		{
			Group:      groupID,
			Partitions: cgtps,
		},
	}

	resetResult, err := admin.AlterConsumerGroupOffsets(ctx, consumerGroupTopicPartitions)

I confirmed that there were no active consumers

kafka-consumer-groups.sh --bootstrap-server <servers> --describe --group connect-entity-sync-v2-cluster-4

| GROUP                            | TOPIC              | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG    | CONSUMER-ID                                                                        | CLIENT-ID                                     |
|----------------------------------|--------------------|-----------|----------------|----------------|--------|------------------------------------------------------------------------------------|-----------------------------------------------|
| connect-entity-sync-v2-cluster-4 | db.topic1      | 0         | 106651680      | 106651981      | 301    | connector-consumer-entity-sync-v2-cluster-4-0-4db28142-aede-44d3-81da-d7545b285091 | connector-consumer-entity-sync-v2-cluster-4-0 |
| connect-entity-sync-v2-cluster-4 | db.topic2 | 0         | 104154940      | 105154403      | 999463 | -                                                                                  | -                                             |
| connect-entity-sync-v2-cluster-4 | db.topic3   | 0         | 64710          | 64862          | 152    | -                                                                                  | -                                             |

@milindl
Copy link
Contributor

milindl commented Feb 21, 2024

@mwufigma In the snippet attached, does the cgtps slice contain any topic partition where topic is "db.topic1"? That is expected to cause problems.

If not - I can take a look and reproduce it.

In the Java client documentation, I notice that it says "Alters offsets for the specified group. In order to succeed, the group must be empty." It doesn't have the requirement for the topic T at all, it wants the consumer group to be empty.

In that case, we'll need to update our documentation.

@mwufigma
Copy link
Author

In the snippet attached, does the cgtps slice contain any topic partition where topic is "db.topic1"? That is expected to cause problems.

@milindl Nope just checked it only contained "db.topic2" which didn't have any active consumers

In the Java client documentation, I notice that it says "Alters offsets for the specified group. In order to succeed, the group must be empty." It doesn't have the requirement for the topic T at all, it wants the consumer group to be empty.

If this is the case, then would the solution be to temporarily disconnect all consumers (for all topics, not just the one I'm altering offset for), alter the offsets and then reconnect the consumers + the new one?

@mwufigma
Copy link
Author

Update: the above solution worked -- if I disconnect all consumers (AKA delete my MSK Connector), alter offsets and re-create the cluster, MSK Connector consumes starting from the desired offset. Thank you for your help @milindl!

@milindl
Copy link
Contributor

milindl commented Feb 21, 2024

Thanks for checking the solution. I will update the documentation to reflect this across our clients, to match the Java documentation.

One thing though, that might help you further: deleting consumers and recreating them is quite expensive, so for 'but have it read starting from a specific offset from about a day ago', why don't you use a rebalance handler in the newly joined consumer?

When you are assigned new partitions, you can use the Assign() method of the consumer within the rebalance handler to seek to the offsets you want to read from. Something similar to this example: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_rebalance_example/consumer_rebalance_example.go#L163

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants