Skip to content

Commit

Permalink
Fixed race condition on multi-topic consumer (#11764)
Browse files Browse the repository at this point in the history
### Motivation

Under certain conditions applications using the multi-topic consumers might get the consumption stalled:

The conditions to reproduce the issue are:
 * Consumer is subscribed to multiple topics, but only 1 topic has traffic
 * Messages are published in batches (no repro if no batches)
 * Receiver queue size == 1 (or small, in order to exercise race condition)

The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full.

What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application  has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer.

### Modification

Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer.

(cherry picked from commit f1d66d1)
  • Loading branch information
merlimat authored and hangc0276 committed Aug 25, 2021
1 parent 8ae8f55 commit 587548e
Showing 1 changed file with 5 additions and 0 deletions.
Expand Up @@ -259,6 +259,11 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);

// Since we din't get a mutex, the condition on the incoming queue might have changed after
// we have paused the current consumer. We need to re-check in order to avoid this consumer
// from getting stalled.
resumeReceivingFromPausedConsumersIfNeeded();
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
Expand Down

0 comments on commit 587548e

Please sign in to comment.