diff --git a/consumer.go b/consumer.go index 0a9a6c31b..45e6ce88c 100644 --- a/consumer.go +++ b/consumer.go @@ -944,6 +944,16 @@ func (bc *brokerConsumer) subscriptionConsumer() { bc.acks.Add(len(bc.subscriptions)) for child := range bc.subscriptions { + if _, ok := response.Blocks[child.topic]; !ok { + bc.acks.Done() + continue + } + + if _, ok := response.Blocks[child.topic][child.partition]; !ok { + bc.acks.Done() + continue + } + child.feeder <- response } bc.acks.Wait()