Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition(may panic) when closing consumer group #2331

Merged
merged 1 commit into from Sep 27, 2022
Merged

fix: race condition(may panic) when closing consumer group #2331

merged 1 commit into from Sep 27, 2022

Conversation

napallday
Copy link
Contributor

@napallday napallday commented Sep 9, 2022

Background

We encounter a data race issue when closing consumer group.

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.36.0 2.8.1 1.17.7
Logs
logs: CLICK ME


WARNING: DATA RACE
Write at 0x00c000f448b0 by goroutine 750:
  runtime.closechan()
      /Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:355 +0x0
  github.com/Shopify/sarama.(*consumerGroup).Close.func1.1()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:161 +0x44

Previous read at 0x00c000f448b0 by goroutine 1238:
  runtime.chansend()
      /Users/xxxxxxxxx/.gvm/gos/go1.17.7/src/runtime/chan.go:158 +0x0
  github.com/Shopify/sarama.(*consumerGroup).handleError()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:568 +0x22d
  github.com/Shopify/sarama.newConsumerGroupSession.func1()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:724 +0xb7
  github.com/Shopify/sarama.newConsumerGroupSession·dwrap·79()
      /Users/xxxxxxxxx/go/pkg/mod/github.com/!shopify/sarama@v1.36.0/consumer_group.go:726 +0x63

Investigation Results

After investigation, we find the root cause.

  • handleError is used to collect errors from heartBeatLoop, partitionConsumer, offsetManager, etc.
    Some goroutines are also spawned for error collecting, including example1, example1, example3 ....
  • If c.config.Consumer.Return.Errors=true, those errors will be sent to a collective channel c.errors.
  • The order of c.errors <- err and close(c.errors) is non-deterministic and could even cause panic theoretically. A possible panic circumstance is shown in the below flow -- after checking the consumer group is not closed, the goroutine switches. And close(c.errors) is called. When goroutine switches back, c.errors <- err could cause panic as an error is sent to a closed channel.

image(here is a screenshot)

Solution

To prevent deadlock issue raised in #1581, I just create another dedicated lock instead of reusing the current c.lock one.

@dnwe
Copy link
Collaborator

dnwe commented Sep 15, 2022

@Jacob-bzx thanks for spotting and debugging this problem

I can see that the mutex approach will solve the data race, but I'm less sure about the potential panic.

I wonder if we might want to consider using a sync.Once and permitting the close in either handlerError (if it has spotted that the consumer is being closed) or alternatively within the current func after the client Close has returned

i.e.,

diff --git a/consumer_group.go b/consumer_group.go
index 7d755ea..3a8a713 100644
--- a/consumer_group.go
+++ b/consumer_group.go
@@ -86,9 +86,10 @@ type consumerGroup struct {
 	memberID        string
 	errors          chan error
 
-	lock      sync.Mutex
-	closed    chan none
-	closeOnce sync.Once
+	lock            sync.Mutex
+	closed          chan none
+	closeOnce       sync.Once
+	closeErrorsOnce sync.Once
 
 	userData []byte
 }
@@ -157,9 +158,6 @@ func (c *consumerGroup) Close() (err error) {
 		}
 
 		// drain errors
-		go func() {
-			close(c.errors)
-		}()
 		for e := range c.errors {
 			err = e
 		}
@@ -167,6 +165,10 @@ func (c *consumerGroup) Close() (err error) {
 		if e := c.client.Close(); e != nil {
 			err = e
 		}
+
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
 	})
 	return
 }
@@ -559,7 +561,10 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 
 	select {
 	case <-c.closed:
-		// consumer is closed
+		// consumer is closed, close errors chan
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
 		return
 	default:
 	}

@napallday
Copy link
Contributor Author

hi @dnwe master, I doubt that the below way with sync.Once doesn't actually resolve the data race issue.

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
        ...
	select {
	case <-c.closed:
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
                 return
	default:
	}

== step 1: switch to step 2 ==

== step 4: from step 3 ==

	select {
	case c.errors <- err:
	default:
		// no error listener
	}
}
func (c *consumerGroup) Close() (err error) {
	c.closeOnce.Do(func() {
== step 2: from step 1 ==
		close(c.closed)
                 ...

		// drain errors
=>1:		for e := range c.errors {
			err = e
		}

		if e := c.client.Close(); e != nil {
			err = e
		}

=>2:
+		c.closeErrorsOnce.Do(func() {
+			close(c.errors)
+		})
	})

== step 3: switch to step4 ==
	return

I mark a possible workflow that would cause data race as step 1~4, which would also cause panic due to writing to a closed channel.
Moreover, if handleError is never called, since we drain errors(=>1) before closing the channel(=>2), c.Close() method may stay in the draining for-loop forever(=>1) and never returned.

I believe the initial solution with RWMutex shown below is able to avoid panic

func (c *consumerGroup) handleError(err error, topic string, partition int32) {
        ...
	c.errorsLock.RLock()
	defer c.errorsLock.RUnlock()

	select {
	case <-c.closed:
		// consumer is closed
		return
	default:
	}

	select {
	case c.errors <- err:
	default:
		// no error listener
	}

func (c *consumerGroup) Close() (err error) {
	c.closeOnce.Do(func() {
=>3		close(c.closed)
                 ...
		go func() {
			c.errorsLock.Lock()
			defer c.errorsLock.Unlock()
=>4			close(c.errors)
		}()

	...
	return
}

because:

  • close(c.closed) (=>3) happens before close(c.errors)(=>4)
  • when handleError holds the read lock, it will first judge whether the consumer is closed(case <- c.closed:), if so, it will return here instead of sending err to the closed channel.

@dnwe dnwe added the fix label Sep 27, 2022
Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@dnwe dnwe merged commit 338bf4f into IBM:main Sep 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants