diff --git a/consumer_group.go b/consumer_group.go index ddd1a5bb3..4f0fd6660 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -86,9 +86,10 @@ type consumerGroup struct { memberID string errors chan error - lock sync.Mutex - closed chan none - closeOnce sync.Once + lock sync.Mutex + errorsLock sync.RWMutex + closed chan none + closeOnce sync.Once userData []byte @@ -159,10 +160,13 @@ func (c *consumerGroup) Close() (err error) { err = e } - // drain errors go func() { + c.errorsLock.Lock() + defer c.errorsLock.Unlock() close(c.errors) }() + + // drain errors for e := range c.errors { err = e } @@ -592,6 +596,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } + c.errorsLock.RLock() + defer c.errorsLock.RUnlock() select { case <-c.closed: // consumer is closed