Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit offset manually when using consumer group #1570

Closed
aditi6622 opened this issue Jan 14, 2020 · 21 comments · Fixed by #1699
Closed

commit offset manually when using consumer group #1570

aditi6622 opened this issue Jan 14, 2020 · 21 comments · Fixed by #1699
Labels
stale/exempt Issues and pull requests that should never be closed as stale

Comments

@aditi6622
Copy link

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
sarama version : Version 1.25.0
go version : go1.12.7
Kafka version: 2.2.1

My use case is: I am reading from a topic, so in case my my consumer fails I should be able to read from my last successful commit rather, If I set in time interval there is chance of message loss. I want to commit manually while using consumer group, I don't want to commit in intervals.
I went through offset_manager.go, but I am unable to do. Kindly help.

@dnwe
Copy link
Collaborator

dnwe commented Jan 15, 2020

Whilst #1164 did (fairly recently) add support for setting c.Consumer.Offsets.AutoCommit.Enable to false to disable auto-commit of offsets, it seems that it did not expose anything to perform a manual commit.

It seems this could be achieved be extracted the commit parts of flushToBroker out into an exported CommitOffsets func that can be manually called

@ghost
Copy link

ghost commented Apr 15, 2020

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@ghost ghost added the stale Issues and pull requests without any recent activity label Apr 15, 2020
@dnwe dnwe added stale/exempt Issues and pull requests that should never be closed as stale and removed stale Issues and pull requests without any recent activity labels Apr 15, 2020
@lqiz
Copy link

lqiz commented Apr 17, 2020

I also encounter this issue, when it will be fixed?

@lqiz
Copy link

lqiz commented Apr 21, 2020

Could you help review this fix, thanks @dnwe
#1678

@knoguchi
Copy link

knoguchi commented May 9, 2020

After reading #1158 I noticed that the PR #1164 didn't even intend to address the manual commit. The author of the PR just wanted to disable the commit-by-interval so it fits for his specific use case of the PartitionOffsetManager. The interval timer isn't even turned off when Enable=false.

#1678 and #1687 don't address the manual commit either. The PR partially reverts the #1164 (so the commit is by the interval again?), and notify the offset to the consumer group over a async channel.

@wclaeys
Copy link
Contributor

wclaeys commented May 11, 2020

Please see new attempt at #1699

@dnwe
Copy link
Collaborator

dnwe commented Jun 24, 2020

As there's been a lot of interest in this functionality, it would be great if all the people watching this issue could test out the now merged PR from @wclaeys with their application code in advance of us cutting any new release version.

Assuming you're using go modules to manage your dependencies, you can update your go.mod to fetch the latest commit from the default branch by doing:

go get github.com/Shopify/sarama@HEAD

@danieldestro
Copy link

Looks like Sarama version 1.26.4 still does not support manual commit, neither flushes to broker after a certain amount of time if Auto Commit is false. I am correct?

My case scenario keeps reading old messages if I restart my app when AutoCommit=false, even is I do:
session.MarkMessage(message, "")

When is this going to be fixed? Or, how can I at least manually flush to broker?

@dnwe
Copy link
Collaborator

dnwe commented Aug 1, 2020

@danieldestro as mentioned in the comment immediately above yours, this functionality is available on the master branch. Despite a lot of interest in this functionality we haven’t yet had any feedback on whether or not the merged PR has been tested and satisfies people’s requirements or not. We can cut a new release tag as soon as we’ve had that confirmation. Can you test out @Head?

@danieldestro
Copy link

danieldestro commented Aug 2, 2020

I have just tested Sarama version currently on master regarding the use off Commit() method to control offsets when Consumer.Offsets.AutoCommit.Enable is false.

But I have a couple of questions here.

Firstly, I thought that when AutoCommit is false, I had to call session.MarkMessage method to explicitly commit its offset as read. But only calling this method is not enough to mark offsets as read. If I restart my application, it will read old messages again. (Well, that´s why Commit method was add, right?)

Then, I implemented the following code, expecting to commit only the offsets marked with session.MarkMessage. I guess was wrong. Because, calling Commit() made all offsets to be marked as read. So, if I run my app again, it will not read old messages, even those that I was expecting to read again (those not marked). Does it make any sense to commit offsets not marked? See my code below:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	go commitMessages(session)

	for message := range claim.Messages() {
		consumerMessageCount++
		err := consumer.processMessage(message, consumer.callback)
		if err == nil {
			session.MarkMessage(message, "")
		}
	}
	return nil
}

func commitMessages(session sarama.ConsumerGroupSession) {
	if autoCommit {
		return
	}
	for {
		session.Commit()
		time.Sleep(fewSeconds)
	}
}

Well, let me go back a little bit. I ran a different scenario now. Last message sent/read was a message where the consumer.processMessage return an error, which mean, message is not marked as read. When I re-run my app, this message gets read again, but older message are not read again. Every time I restart the app, sem behaviour is seen. Yey! That´s what I expected.

But, if continue my test and produce/read more messages and new messages are marked as read and Commit is called, if I stop the consumer app and re-run it, old ERROR messages (not marked) will not be read again. Why? Is it because when Commit is called upon, Kafka remembers the last commited offset to that particular consumer group?

What I need is: mark success messages as read and the messages with error I want them to not be marked as read, to be able to process them again later.

Is this scenario possible with Kafka + Sarama? Or should I think in something else like using a DLQ to handle messages with error to be reprocessed later?

@danieldestro
Copy link

danieldestro commented Aug 2, 2020

My third test:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	for message := range claim.Messages() {
		consumerMessageCount++
		err := consumer.processMessage(message, consumer.callback)
		if err == nil {
			session.MarkMessage(message, "")
			session.Commit()
		}
	}

	return nil
}

Even if there is a message with error (returned by consumer.processMessage), when I restart my application the message is considered read too.

Well, I confess I am kind confused here about what should be the expected behavior when using AutoCommit=false and calling MarkMessage and Commit methods. And the correct implementation for the expected behavior.

@wclaeys
Copy link
Contributor

wclaeys commented Aug 3, 2020

Kafka remembers the last commited offset to that particular consumer group?
Yes
Is this scenario possiblem withh Kafka + Sarama?
No (has nothing to do with Sarama, It's by design how Kafka works as it's a distributed message queue)

@wclaeys
Copy link
Contributor

wclaeys commented Aug 3, 2020

@dnwe We have it running on our DEV and TEST environments since the 24th of June without any issues (actually even before that date when we were running on a private fork).

@danieldestro
Copy link

Thanks, @wclaeys - so what is the scenario and expected results you get from setting AutoCommit=false and calling Commit? So I can understand better how this patch works.

@alok87
Copy link

alok87 commented Aug 9, 2020

I also got confused, so i ran the following tests:

#test AutoCommit Set? Commit() called? MarkMessage Behaviour Expected Result/Question
1 True NotCalled True Restarting the application should not consume the marked messages Perfect ✅
2 False NotCalled True Restarting the application should have consumed the messages again as they were marked but commit was not called manually Did not happen as expected, non committed but marked messages were not consumed again!! 😕 ❌
3 False Called True Restarting the application should not consume the marked, committed messages. Perfect ✅

Please clear the confusion for #test2.

@alok87
Copy link

alok87 commented Aug 9, 2020

@danieldestro I could not reproduce the issue you are reporting here. #1570 (comment)
When the processMessage() returns error, the restart executes the same job again as expected. And once the message gets processed without error, the restart after that does not get the same job as expected.

My third test:

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	for message := range claim.Messages() {
		consumerMessageCount++
		err := consumer.processMessage(message, consumer.callback)
		if err == nil {
			session.MarkMessage(message, "")
			session.Commit()
		}
	}

	return nil
}

@danieldestro
Copy link

danieldestro commented Aug 12, 2020

@alok87 as far as I remember, in my tests this scenario occurred:

Test-1:

  • Program consumes OK messages and mark them and commit
  • Restart program and older marked messages are not consumed anymore (GOOD)

Test-2:

  • Program consumes OK messages and mark them and commit
  • Last message returns an error (by method consumer.processMessage)
  • Restart program and older marked messages are not consumed anymore (GOOD)
  • Last message not marked (error) is read again (GOOD)

Test-3:

  • Program consumes OK messages and mark them and commit
  • Second last message returns an error (by method consumer.processMessage)
  • Last message is OK and is marked and commited
  • Restart program and older marked messages are not consumed anymore (GOOD)
  • Second last message not marked (error) is NOT read again
  • Last message (OK) is NOT read again

@yogeshbombepune
Copy link

yogeshbombepune commented Aug 27, 2020

Is their any programmatic way or kafka config to reprocess consume but Not committed and Not MarkMessage events at run time without restarting application; Or it reassign to other member of that consumer group.

@alok87
Copy link

alok87 commented Mar 3, 2021

Test1 is failing for me #1570 (comment)

@alok87
Copy link

alok87 commented Mar 3, 2021

@danieldestro I think the behaviour you saw with second last message here (Test 3) #1570 (comment) was because the last commit got committed so the new consumption will only begin from the last cosumed offset + 1 which is what is happening in your case.

The 2nd last errored one which was not marked would get ignored.

@shmilyoo
Copy link

@danieldestro

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

	go commitMessages(session)

	for message := range claim.Messages() {
		consumerMessageCount++
		err := consumer.processMessage(message, consumer.callback)
		if err == nil {
			session.MarkMessage(message, "")
		}
	}
	return nil
}

ConsumeClaim will be called after each kafka consume group rebalance. So your commitMessages goroutine will be called many times. It may be not good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale/exempt Issues and pull requests that should never be closed as stale
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants