Skip to content

Commit

Permalink
fix: race condition when closing consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
napallday committed Sep 15, 2022
1 parent 3083a9b commit e26c683
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions consumer_group.go
Expand Up @@ -86,9 +86,10 @@ type consumerGroup struct {
memberID string
errors chan error

lock sync.Mutex
closed chan none
closeOnce sync.Once
lock sync.Mutex
errorsLock sync.RWMutex
closed chan none
closeOnce sync.Once

userData []byte
}
Expand Down Expand Up @@ -156,10 +157,13 @@ func (c *consumerGroup) Close() (err error) {
err = e
}

// drain errors
go func() {
c.errorsLock.Lock()
defer c.errorsLock.Unlock()
close(c.errors)
}()

// drain errors
for e := range c.errors {
err = e
}
Expand Down Expand Up @@ -557,6 +561,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
return
}

c.errorsLock.RLock()
defer c.errorsLock.RUnlock()
select {
case <-c.closed:
// consumer is closed
Expand Down

0 comments on commit e26c683

Please sign in to comment.