Skip to content

Commit

Permalink
fix(consumer): avoid pushing unrelated responses to paused children
Browse files Browse the repository at this point in the history
Signed-off-by: Panos Koutsovasilis <koutsobill@hotmail.com>
  • Loading branch information
pkoutsovasilis committed Aug 11, 2022
1 parent 73de8dd commit 3c463f8
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions consumer.go
Expand Up @@ -944,6 +944,16 @@ func (bc *brokerConsumer) subscriptionConsumer() {

bc.acks.Add(len(bc.subscriptions))
for child := range bc.subscriptions {
if _, ok := response.Blocks[child.topic]; !ok {
bc.acks.Done()
continue
}

if _, ok := response.Blocks[child.topic][child.partition]; !ok {
bc.acks.Done()
continue
}

child.feeder <- response
}
bc.acks.Wait()
Expand Down

0 comments on commit 3c463f8

Please sign in to comment.