-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix/112 lost messages #113
Conversation
Hi, did you have time to take a look at it? Thanks |
Thanks, I will do! |
@ppatierno we do have a 3.5.4 release soon, we also backport this if you review it. |
@vietj I have planned to review this one today ;) |
if you can backport it to 3.5 branch and add it here it would be great : vert-x3/issues#401 |
@vietj I am thinking about this PR and the problem that @emasab it facing. He noticed that :
is called even when we are still getting messages from
So on the main event loop right? How it's possible for you to have two calls concurrently? |
finally another question for @emasab do we really need the |
@ppatierno About the concurrency problem, it seems that the flow of executing is more or less this one:
Here the bug isn't avoidable anymore
The names correspond to this pieces of code: schedule-0 // Don't call pollRecords directly, but use schedule() to actually pause when the readStream is paused
schedule(0); schedule-1 Handler<ConsumerRecord<K, V>> handler = this.recordHandler;
this.context.runOnContext(v1 -> { resume if (this.paused.compareAndSet(true, false)) {
this.schedule(0);
} pollRecords if (this.current == null || !this.current.hasNext()) {
this.pollRecords(records -> { worker this.worker.submit(() -> { poll-1 ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout); poll-2 this.context.runOnContext(v -> handler.handle(records)); current this.current = records.iterator(); |
could you rebase this branch ? I pushed a late commit in master to support ReadStream#fetch method that is new in 3.6 |
almost ready, but could you push the fetch method of the interfaces? It gives me a compile error |
the method is in vertx core master |
ah, ok. It's in ReadStream |
c046506
to
47ee65a
Compare
now it's ready to be merged |
exceptionHandler.handle(e); | ||
} | ||
} | ||
if(!this.polling.compareAndSet(false, true)){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we could make this method always run on Event Loop and remove the Atomic here with:
private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
if (Vertx.currentContext() == this.context) {
pollRecordsImpl(handler);
} else {
vertx.runOnContext(v -> pollRecordsImpl(handler));
}
}
private void pollRecordsImpl(Handler<ConsumerRecords<K, V>> handler) {
...
}
this way we avoid the CAS and most of the time this will be anyway called from the event loop.
It wouldn't be the same because there we are already inside the vertx thread, the AtomicBoolean is useful because the variable is accessed by the vertx thread here and by the executorService thread at if(!submitted) this.polling.set(false); What the code does is to avoid submitting to the worker now and it tries again when the iterator has been overwritten and the if at pollRecords returns false. It could be that this causes a semi-loop because with schedule(0) it causes a loop, until the iterator is overwritten so there must be a cleaner solution |
Another thing, I've found that if you remove the schedule(1) at line 142, most times it works but sometimes that partition hangs and the unit test goes timeout. I've found this behavior happening with a real kafka broker too, at some point in time the partitions hang one after one until the consumer blocks completely. |
@emasab in this case if that's only read by the executor service, we need only to be volatile, or at least we can avoid the compare and set if that's always written from the same thread and just use a set which is cheaper. |
Yes, that's why in the beginning I haven't used an AtomicBoolean, because the alternative is to do if(!this.polling){
this.polling = true;
....
} else {
schedule(1);
} but there only one thread, the vertx thread, executing this piece of code so it cannot be that two threads enter the if before polling is set to true. But if we use the AtomicBoolean it's better to use compareAndSet, otherwise we could use a simple boolean like in this example and make it volatile |
47ee65a
to
4962cc7
Compare
I've made a new change and with this change the loop or semi-loop (with schedule(1)) should be completely avoided while the bug continues to be fixed Now the polling boolean is used exclusively by vertx thread so I could replace it with a simple boolean if you agree |
news? |
I think it's fine but keep an eye on the CI. I had to fix tests because they were reporting false in Travis and make sure that the tests pass more consistently. |
Nothing more to fix, just this change
but it doesn't change much, do you want to make it? |
Unit test and fix for issue:
#112