Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] what's the recommended way to stop a consumer without auto-committing? #900

Open
ntextreme3 opened this issue Jun 6, 2023 · 3 comments
Labels

Comments

@ntextreme3
Copy link

ntextreme3 commented Jun 6, 2023

consumer.stop will

Commit last consumed message if autocommit enabled

For a consumer with enable.auto.commit=True, I'd like to be able to shutdown without committing in order maintain better control over failure / recovery scenarios.

Given a consumer in a group meant to run indefinitely, if we encounter an exception and need to shut down 1 consumer before the message has been processed, and we call consumer.stop as part of the process cleanup, the message in progress is effectively lost, since stop will commit.

Even without encountering an exception, if we wanted to take a node out of service, and shutdown before the message has finished processing, we should be able to shutdown without committing. The message in question could take a long time to process, so we can't just wait for it to finish before shutting down.

We can take over committing in this scenario, but that's less than ideal.

We can also just kill the process without stopping the consumer, but then you'd need to wait for session timeouts before a new consumer could get those partition assignments.

It wouldn't be enough to .close the coordinator either, as that also will commit during finalization.

The closest thing would be getting a reference to the coordinator and calling maybe_leave_group while shutting down, but getting the coordinator doesn't seem to be part of the public api, regardless. Same for doing something like:

# not public
consumer._enable_auto_commit = False  
consumer._coordinator._enable_auto_commit = False
await consumer.stop()

It seems like the cleanest would be if you could do something like await consumer.stop(commit=False).

Any suggestions?

@vmaurin
Copy link
Contributor

vmaurin commented Jun 7, 2023

I am coming from the Java clients, and, similar to aiokafka, when you have autocommit, you cannot really control the processing guarantee, because the offset commits are performed in a separated coroutine/thread. What you seems to aim is an "at least once" guarantee, and the only way do perform that is to manage your commit yourself with this kind of loop

while running:
    batch = consumer.getmany(timeout_ms=xxxx, max_records=yyyy)
    // you do something with you messages
   consumer.commit()

So when you stop the consumer, you get an exception and you exit the loop without commiting your offsets

@ods
Copy link
Collaborator

ods commented Jun 7, 2023

As @vmaurin already mentioned, disabling autocommit won't solve your problem completely, as offset may have already been committed to this moment. One solution is to commit offset manually after handling each event (not good for performance) or batch. Also, it would be convenient to acknowledge (store in memory) offset manually and commit it periodically in background as autocommit does, but this feature is not implemented in aiokafka yet (see #826).

@ntextreme3
Copy link
Author

Ah yes, thanks! I guess I was more informed a year ago when I posted that other question. I'm just coming back to a project that we finally have time for 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants