Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Rebalance events behavior for static membership and fix for consu… #757

Merged
merged 2 commits into from May 2, 2022

Conversation

jliunyu
Copy link
Contributor

@jliunyu jliunyu commented Apr 6, 2022

…mer close takes longer than 10s

EAGER rebalance: 6 partition(s) revoked: [testrebalance[0]@unset testrebalance[1]@unset testrebalance[2]@unset testrebalance[3]@unset testrebalance[4]@unset testrebalance[5]@unset]
Ignored OffsetsCommitted (<nil>, [testrebalance[0]@953 testrebalance[1]@900 testrebalance[2]@919 testrebalance[3]@unset testrebalance[4]@857 testrebalance[5]@925]) for consumer rdkafka#consumer-1
rdkafka#consumer-1, EAGER rebalance: 3 new partition(s) assigned: [testrebalance[0]@unset testrebalance[1]@unset testrebalance[2]@unset]
rdkafka#consumer-2, EAGER rebalance: 3 new partition(s) assigned: [testrebalance[3]@unset testrebalance[4]@unset testrebalance[5]@unset]
Message on testrebalance[1]@900:
test
EAGER rebalance: 3 partition(s) revoked: [testrebalance[0]@unset testrebalance[1]@unset testrebalance[2]@unset]
Ignored OffsetsCommitted (<nil>, [testrebalance[0]@unset testrebalance[1]@901 testrebalance[2]@unset]) for consumer rdkafka#consumer-1
EAGER rebalance: 3 partition(s) revoked: [testrebalance[3]@unset testrebalance[4]@unset testrebalance[5]@unset]
rdkafka#consumer-2, EAGER rebalance: 2 new partition(s) assigned: [testrebalance[2]@unset testrebalance[3]@unset]
rdkafka#consumer-1, EAGER rebalance: 2 new partition(s) assigned: [testrebalance[0]@unset testrebalance[1]@unset]
rdkafka#consumer-3, EAGER rebalance: 2 new partition(s) assigned: [testrebalance[4]@unset testrebalance[5]@unset]
rdkafka#consumer-4, EAGER rebalance: 2 new partition(s) assigned: [testrebalance[2]@unset testrebalance[3]@unset]
PASS
ok  	_/Users/jliu/Documents/Code/jliunyu/confluent-kafka-go/kafka	38.019s

Manually tested for the Close, before this fix, the consumer takes more than 10s, with this fix, closing time: 730.602µs

consumer closed. closing time: 730.602µs
PASS
ok  	_/Users/jliu/Documents/Code/jliunyu/confluent-kafka-go/kafka	0.573s

@CLAassistant
Copy link

CLAassistant commented Apr 6, 2022

CLA assistant check
All committers have signed the CLA.

@jliunyu jliunyu mentioned this pull request Apr 6, 2022
7 tasks
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff!

CHANGELOG.md Outdated Show resolved Hide resolved
for run {
c.Poll(100)
select {
case <-doneChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will block until doneChan is written to, which means if Poll() is triggered once for some other reason, e.g., a stats callback, then it will hang here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 to exist the loop which is same with the previous way?

for run {
		c.Poll(100)
		select {
		case <-doneChan:
			run = false

		default:
			if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
				run = false
			}
		}
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I'm also confused why it will hang, I tested with this code with the existing change in this PR, it doesn't hang

func TestConsumerClose1(t *testing.T) {

	c, err := NewConsumer(&ConfigMap{
		"group.id":               "gotest"})
	if err != nil {
		t.Fatalf("%s", err)
	}

	startClose := time.Now()
	err = c.Close()
	if err != nil {
		panic(err)
	}
	fmt.Println(fmt.Sprintf("consumer closed. closing time: %v", time.Since(startClose)))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with the following sequence of events it will hang:

  1. application calls Close()
  2. Close() is blocking on c.Poll()
  3. A stats event (or something else which is not a rebalance event) triggers the Poll and Poll() wakes up and returns
  4. Close() is now blocking on doneChan
  5. librdkafka emits a rebalance event and waits for the app/go to ack it
  6. .. but since Close() is no longer calling Poll() the rebalance event is never handled.
  7. indefinite hang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is easily solved by moving the Poll to the default case in the select, like in other places in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for run {
		select {
		case <-doneChan:
**-------------->// Is it possible that after the doneChan, there are still other events we need to handle? We always have to wait to read the doneChan value. If we exit from default, the doneChan will report some error**
			run = false

		default:
                         c.Poll(100)
			if int(C.rd_kafka_queue_length(c.handle.rkq)) == 0 {
				run = false
			}
		}
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, when doneChan is done we don't need to call Poll no more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the queue_length() check needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, I already removed it from my code.

kafka/consumer_test.go Outdated Show resolved Hide resolved
switch ev := event.(type) {
case AssignedPartitions:
atomic.AddInt32(&assignedEvents, 1)
fmt.Fprintf(os.Stderr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Tests should use t.Logf if possible, but that might be tricky here in the callback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, is there any good way to use t *testing.T in the callback?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, really. Go 🤷

kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do all existing tests pass now?

CHANGELOG.md Outdated Show resolved Hide resolved
@@ -429,3 +431,277 @@ func TestConsumerLog(t *testing.T) {
}
}
}

func wrapRebalanceCb(assignedEvents *int32, revokedEvents *int32, t *testing.T) func(c *Consumer, event Event) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on the wrapping to provide the testing.T

kafka/consumer_test.go Outdated Show resolved Hide resolved
return
}

wg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to keep this as a non-pointer here, and then pass it as pointer when needed (to testPoll()).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.

For example:
wg.Add(1)
wg.Wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pointer receiver methods apply also to non pointer variables, based on this.

So pointer should be needed to pass the WaitGroup struct by reference, but not to call its methods with pointer receiver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.

I still believe it is more correct to keep it as a non-pointer here and pass it as a pointer where needed.
But it is a nit, so do as you wish.

@jliunyu
Copy link
Contributor Author

jliunyu commented Apr 15, 2022

Do all existing tests pass now?

Yes:)

jliu@C02DM32YMD6T kafka % go test -run "TestConsumer*" 
%4|1649999288.094|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property `fetch.wait.max.ms` (500) should be set lower than `socket.timeout.ms` (1000) by at least 1000ms to avoid blocking and timing out sub-sequent requests
rdkafka#consumer-6 Assigned partitions are: [testrebalance[0]@stored testrebalance[1]@stored testrebalance[2]@stored testrebalance[3]@stored testrebalance[4]@stored testrebalance[5]@stored]
rdkafka#consumer-7 Assigned partitions are: [testrebalance[3]@stored testrebalance[4]@stored testrebalance[5]@stored]
rdkafka#consumer-8 Assigned partitions are: [testrebalance[4]@stored testrebalance[5]@stored]
rdkafka#consumer-9 Assigned partitions are: [testrebalance[2]@stored testrebalance[3]@stored]
Static test passed

@jliunyu jliunyu force-pushed the staticMember branch 2 times, most recently from fe62e71 to f9ff23f Compare April 19, 2022 16:03
@jliunyu
Copy link
Contributor Author

jliunyu commented Apr 19, 2022

kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some unaddressed comments in the test file.

return
}

wg := &sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep this as a non-pointer here, we need to use it as pointer each time, and there are a lot of use cases in the same function.

I still believe it is more correct to keep it as a non-pointer here and pass it as a pointer where needed.
But it is a nit, so do as you wish.

kafka/consumer_test.go Outdated Show resolved Hide resolved
kafka/consumer_test.go Show resolved Hide resolved
kafka/consumer_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jliunyu jliunyu merged commit e01dd29 into confluentinc:master May 2, 2022
@sermojohn
Copy link

Is there a plan to make a release with this fix?

@jliunyu
Copy link
Contributor Author

jliunyu commented May 20, 2022

Is there a plan to make a release with this fix?

Yes, this one will be released soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants