diff --git a/consumer_group.go b/consumer_group.go index b20edd978..ce4cc77af 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -86,9 +86,10 @@ type consumerGroup struct { memberID string errors chan error - lock sync.Mutex - closed chan none - closeOnce sync.Once + lock sync.Mutex + errorsLock sync.RWMutex + closed chan none + closeOnce sync.Once userData []byte } @@ -156,10 +157,13 @@ func (c *consumerGroup) Close() (err error) { err = e } - // drain errors go func() { + c.errorsLock.Lock() + defer c.errorsLock.Unlock() close(c.errors) }() + + // drain errors for e := range c.errors { err = e } @@ -557,6 +561,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } + c.errorsLock.RLock() + defer c.errorsLock.RUnlock() select { case <-c.closed: // consumer is closed