From 3c463f86af6d66ed0f5a7f838394cb457103df50 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 12 Aug 2022 02:53:17 +0300 Subject: [PATCH] fix(consumer): avoid pushing unrelated responses to paused children Signed-off-by: Panos Koutsovasilis --- consumer.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/consumer.go b/consumer.go index 0a9a6c31b..45e6ce88c 100644 --- a/consumer.go +++ b/consumer.go @@ -944,6 +944,16 @@ func (bc *brokerConsumer) subscriptionConsumer() { bc.acks.Add(len(bc.subscriptions)) for child := range bc.subscriptions { + if _, ok := response.Blocks[child.topic]; !ok { + bc.acks.Done() + continue + } + + if _, ok := response.Blocks[child.topic][child.partition]; !ok { + bc.acks.Done() + continue + } + child.feeder <- response } bc.acks.Wait()