Skip to content

Commit

Permalink
Merge pull request #2317 from pkoutsovasilis/fix-unrelated-responses-…
Browse files Browse the repository at this point in the history
…paused-child

fix(consumer): avoid pushing unrelated responses to paused children
  • Loading branch information
dnwe committed Sep 15, 2022
2 parents 73de8dd + 3c463f8 commit 34a28a1
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions consumer.go
Expand Up @@ -944,6 +944,16 @@ func (bc *brokerConsumer) subscriptionConsumer() {

bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
if _, ok := response.Blocks[child.topic]; !ok {
bc.acks.Done()
continue
}

if _, ok := response.Blocks[child.topic][child.partition]; !ok {
bc.acks.Done()
continue
}

child.feeder <- response
}
bc.acks.Wait()
Expand Down

0 comments on commit 34a28a1

Please sign in to comment.