From e26c6831ced9eb1bfc37e2c72a160c2042cc4a8f Mon Sep 17 00:00:00 2001 From: Jacob-bzx Date: Sat, 10 Sep 2022 00:35:58 +0800 Subject: [PATCH] fix: race condition when closing consumer group --- consumer_group.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index b20edd978..ce4cc77af 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -86,9 +86,10 @@ type consumerGroup struct { memberID string errors chan error - lock sync.Mutex - closed chan none - closeOnce sync.Once + lock sync.Mutex + errorsLock sync.RWMutex + closed chan none + closeOnce sync.Once userData []byte } @@ -156,10 +157,13 @@ func (c *consumerGroup) Close() (err error) { err = e } - // drain errors go func() { + c.errorsLock.Lock() + defer c.errorsLock.Unlock() close(c.errors) }() + + // drain errors for e := range c.errors { err = e } @@ -557,6 +561,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } + c.errorsLock.RLock() + defer c.errorsLock.RUnlock() select { case <-c.closed: // consumer is closed