Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AutoCommit and MarkOffset behaviour #1689

Closed
harshana-pickme opened this issue May 6, 2020 · 12 comments
Closed

AutoCommit and MarkOffset behaviour #1689

harshana-pickme opened this issue May 6, 2020 · 12 comments

Comments

@harshana-pickme
Copy link

Versions

Version : 1.26.1

| Sarama | Kafka | Go |

Problem Description

Hi all,

In group consumers, when conf.AutoCommit is on, it automatically commits the consumed messages. But when I make AutoCommit off to control this behavior, I use the MarkOffset() to do a soft commit. But These never get committed. Am I doing something wrong or not doing something ?

Please help.

Thanks,
Harshana

@knoguchi
Copy link

knoguchi commented May 10, 2020

Sarama does not have the manual commit function altho the AutoCommit.Enable = false config exists. See #1570

@harshana-pickme
Copy link
Author

What happens if I leave auto_commit=true and call MarkOffset()

What is the actual functionality of MarkOffset() ?

@wclaeys
Copy link
Contributor

wclaeys commented May 11, 2020

#1699 adds a manual commit function and disables the mainloop ticker if AutoCommit.Enable is false.

@harshana-pickme
Copy link
Author

Can someone explain me the purpose of ConsumerGroupSession.MarkOffset() and what happens to commits under following scenarios ?

For a group consumer:

  1. AutoCommit.Enable=true, App does not call ConsumerGroupSession.MarkOffset()
  2. AutoCommit.Enable=true, App calls ConsumerGroupSession.MarkOffset() time to time.
  3. AutoCommit.Enable=false, App does not call ConsumerGroupSession.MarkOffset()
  4. AutoCommit.Enable=false, App calls ConsumerGroupSession.MarkOffset() time to time.

Thank you.

@harshana-pickme
Copy link
Author

Can some please help me with the above query?

@harshana-pickme
Copy link
Author

Let me re-phrase the question.
When auto commit is enabled. Sarama will commit even if don't call MarkOffset(). Then what is the use of MarkOffset() and how can we make sure Sarama hasn't commited a given offset by the time app calls for MarkOffset() ?

@coldfire-x
Copy link

@harshana-pickme any progress?

@dnwe
Copy link
Collaborator

dnwe commented Jun 23, 2020

@harshana-pickme as per the discussion in #1570, disabling AutoCommit in the currently available versions of Sarama was not useful and I think #1699 should hopefully provide you with the range of capabilities that you are interested in.

MarkOffset is used to internally track the next offset on a given partition that a fetch should start from, so the expectation is that you would always call that for every message that you've processed within your app (here using the convenint MarkMessage func so that it correctly sets offset+1

for m := range claim.Messages() {
	// TODO: process message.Value
	session.MarkMessage(m, "")
}

AutoCommit will periodically (on the given interval) commit whatever the currently marked offsets are for the topic partitions assigned to your consumer

If you disable AutoCommit (once #1699 has merged) then you'd be expected to call Commit() yourself on some regular cadence which again would commit the currently marked offsets to the consumer group in Kafka

@dnwe
Copy link
Collaborator

dnwe commented Jun 24, 2020

As there's been a lot of interest in this functionality, it would be great if all the people watching this issue could test out the now merged PR from @wclaeys with their application code in advance of us cutting any new release version.

Assuming you're using go modules to manage your dependencies, you can update your go.mod to fetch the latest commit from the default branch by doing:

go get github.com/Shopify/sarama@HEAD

I'm going to close this, assuming the questions and problem to have been resolved now, feel free to re-open or raise new issues if you are still finding it problematic

@dnwe dnwe closed this as completed Jun 24, 2020
@harshana-pickme
Copy link
Author

Thank you @dnwe for the helpful comments. Now it's clear what is the intended use of Mark(). It may be helpful for others if a summary can be put in the help doc as well.

Thank you again.
Harshana

@qiangmzsx
Copy link
Contributor

I am using v1.27.0 and have set up.

mqConfig.Consumer.Offsets.AutoCommit.Enable = true
	mqConfig.Consumer.Offsets.AutoCommit.Interval = 100* time.Millisecond

But sarama doesn't automatically submit the offset as expected.

@qiangmzsx
Copy link
Contributor

I am using v1.27.0 and have set up.

mqConfig.Consumer.Offsets.AutoCommit.Enable = true
	mqConfig.Consumer.Offsets.AutoCommit.Interval = 100* time.Millisecond

But sarama doesn't automatically submit the offset as expected.

I see, it still requires a manual process to submit, but the submissions are spaced out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants