-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle partition reassignment #2
Handle partition reassignment #2
Conversation
src/consumer/worker.js
Outdated
}) | ||
continue | ||
} | ||
partitionAssignments.set(key, workerId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A batch can be in the queue, while it's not being processed.
In that case, a fetcher can still add a duplicate batch to the queue.
I would suggest keeping the partition assignments in fetch manager, but do something similar in fetcher instead - i.e. assign partitions from fetch response to nodeId(?), and for any fetch request, filter out partitions that are not in progress/queue already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'll give this a crack and see if I can make it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it up to the fetch manager/fetcher now, to make sure that only one of them can push batches for a topic-partition at any given time. I removed the filtering from the worker, as I don't believe it should be necessary anymore.
Co-authored-by: Priit Käärd <priitkaard123@gmail.com>
If a partition is reassigned to a different broker, it's possible that a fetcher currently has a batch for that topic-partition in the worker queue. This would cause double processing. To avoid this, the fetch manager now keeps track of which fetcher is currently handling each topic-partition, and the fetchers will filter out batches for any topic-partition that is currently being processed by a different fetcher.
…kajs into handle-partition-reassignment
Attempt at fixing tulios#1258 (comment)