Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer hanging on Close() call when redeploying application #767

Closed
stb2132 opened this issue Apr 21, 2022 · 15 comments
Closed

Consumer hanging on Close() call when redeploying application #767

stb2132 opened this issue Apr 21, 2022 · 15 comments

Comments

@stb2132
Copy link

stb2132 commented Apr 21, 2022

Description

We are running into issues when shutting down consumers in our environment, where closing the consumer gets stuck and loops before our pod (we run in k8s) is forcibly terminated. This does not happen for every instance of our application, but on a restart or redeploy of all pods, 5-10% will end up hanging and forcibly terminated. Details on how many pods are running and how we redeploy are in the "How to Reproduce" section.

We think it has something to do with the unsubscribe() call in consumer.Close(), but are unsure. It could also be an implentation detail of how we are handling Assign/Revoke events. It is worth noting that we will be in the middle of a rebalance while making this call to consumer.Close. We cannot use static membership due to the issue reported here:

Config/Env Details:

	cfgMap := &kafka.ConfigMap{
		"session.timeout.ms":                    "30000",
		"bootstrap.servers":                     "REDACTED",
		"broker.version.fallback":               "0.10.2",
		"group.id":                              "net-shadow",
		"log.connection.close":                  true,
		"go.logs.channel.enable":                true, 
		"message.max.bytes":                     "5000000",
		"go.application.rebalance.enable":       true,
		"enable.auto.commit":                    false,
		"enable.partition.eof":                  true, 
		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset": "latest",
		},
		"coordinator.query.interval.ms":         120000,
		"queued.max.messages.kbytes":            1048576,
		"fetch.wait.max.ms":                     100,
		"partition.assignment.strategy":         "cooperative-sticky",
	}

Code Details

We instantiate our consumer with a rebalanceCB and a SubscribeTopics call:

if err := consumer.SubscribeTopics(topics, r.rebalanceCallback); err != nil {
    return nil, errors.Wrap(err, "error subscribing to kafka topic")
}

Note: our rebalanceCB is a bit strange in that it just a wrapper for calling our "handleEvent" function:

func (r *reader) rebalanceCallback(kafkaConsumer *kafka.Consumer, event kafka.Event) error {
	r.handleEvent(event, nil)
	return nil
}

With our handleEvent function (note - have removed some log lines/if statements to keep the code concise/clear):

func (r *reader) handleEvent(ev kafka.Event, decoder *decoder) {
	var err error
	switch e := ev.(type) {
	case kafka.AssignedPartitions:
		err = r.consumer.IncrementalAssign(e.Partitions)		
		if err != nil {
			log.Error("error assigning partitions")
		} 
	case kafka.RevokedPartitions:
		err = r.consumer.IncrementalUnassign(e.Partitions)
		if err != nil {
			log.Error("error revoking partitions")
		} 
	case *kafka.Message:
		// Redacted, but basically we pass message info onto a channel
		handler.in <- e
			
	case kafka.Error:
		log.Error("error received from kafka consumer")
	}
}

We have two Go threads that handle polling and offset commits respectively. The application will shut down by stopping the poll goroutine with stopReading(), then closing offset commits channel, before finally attempting to close the consumer:

func (r *reader) stopReading() {
	r.stopReads.Set(true)
	r.readStopper.StopAndWait()
}

func (r *reader) close() {
	close(r.offsetCommits)
	r.stopper.StopAndWait()
	if r.events != nil {
		close(r.events)
	}

	log.Info("closing queue reader...", log.String("cluster", r.cluster.Name), log.Strings("topics", r.topics))

	if err := r.consumer.Close(); err != nil {
		log.Error("error closing consumer")
	} else {
		log.Info("consumer closed")
	}
}

Additional Observations

According to our application logs, we will stop reading and writing offset commits, but we will hang when calling r.consumer.Close().

This can be found in the gist of client logs linked below, but I have included a snippet here:

"2022-04-19T16:01:20.357Z","2022-04-19T16:01:19.549434+00:00 INFO (queue/reader.go:479) - closing queue reader... cluster=kafka-net topics={flow_logs_topic}"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"" received op SUBSCRIBE in state up (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": subscribe to new unset subscription of 0 topics (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": postponing subscribe until previous rebalance completes (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.606|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer"
"2022-04-19T16:01:20.357Z","%7|1650384079.606|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events"

After this point in the logs, we saw a few commonalities in the pods that get stuck. We consistently see:

"2022-04-19T16:01:20.358Z","%7|1650384079.606|UNASSIGN|rdkafka#consumer-1| [thrd:app]: Forcing unassign of 13 partition(s)"
...
...
"2022-04-19T16:01:20.359Z","%4|1650384079.606|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE"

and ultimately we will "hang" with this log line repeatedly reported:

"2022-04-19T16:01:36.424Z","%7|1650384095.480|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": waiting for assign call, 13 toppar(s), 0 commit(s) (state up, join-state wait-unassign-call) before terminating"

How to reproduce

We can have this consistently occur in our setup, but I have not spun up a separate test.

We see this happen upon a deploy with 20 consumer instances running. If we run such that 4 consumers rotate every 30 seconds during a deploy, usually 1-2 out of 20 will end up stuck and forcibly terminated.

spec:
   replicas: 20
   minReadySeconds: 30
   strategy:
     type: RollingUpdate
     rollingUpdate:
       maxSurge: 4
       maxUnavailable: 0

Please let me know if you need more information or have further questions.

@jliunyu
Copy link
Contributor

jliunyu commented Apr 21, 2022

This is the fix: #757

@stb2132
Copy link
Author

stb2132 commented Apr 22, 2022

@jliunyu Thanks! Unfortunately, it looks like we're still seeing this.

I went ahead and pulled your change into my own fork and deployed:

I ran a rolling-restart and we saw the same pattern in the logs. I have included them here:

We're going to try dumping the goroutines to see if we can more closely determine where we're hanging.

@jliunyu
Copy link
Contributor

jliunyu commented Apr 22, 2022

The PR is still pending for merging and the target release for the fix is v1.9.0.

@kevinconaway
Copy link
Contributor

@jliunyu We tested the PR in a fork and it still does not fix the issue.

I was able to reproduce the issue with the following test case on my local broker. I created a topic close-debug with 256 partitions:

package kafka

import (
	"fmt"
	"testing"
	"time"
)

func TestCloseDebugging(t *testing.T) {
	cfgMap := &ConfigMap{
		"session.timeout.ms":              15000,
		"bootstrap.servers":               "127.0.0.1:9092",
		"broker.version.fallback":         "0.10.2.1",
		"group.id":                        "close-debug",
		"log.connection.close":            false,
		"go.logs.channel.enable":          false,
		"message.max.bytes":               500000,
		"go.application.rebalance.enable": true,
		"enable.auto.commit":              false,
		"enable.partition.eof":            true, // explicitly set this to true be compatible with librdkfaka 1.3.0
		"default.topic.config": ConfigMap{
			"auto.offset.reset": "latest",
		},
		"coordinator.query.interval.ms": 120000,
		"queued.max.messages.kbytes":    1048576,
		"fetch.wait.max.ms":             100,
		"partition.assignment.strategy": "cooperative-sticky",
	}

	consumer, err := newConsumer(cfgMap)

	if err != nil {
		t.Fatalf("error setting up consumer: %s", err)
	}

	go func() {
		fmt.Printf("Polling\n")
		consumer.startPoll()
	}()

	time.Sleep(10 * time.Second)
	fmt.Printf("Starting close\n")

	if err := consumer.close(); err != nil {
		fmt.Printf("error closing consumer: %s\n", err)
	}
	fmt.Printf("closed\n")
}

type testConsumer struct {
	consumer *Consumer
}

func newConsumer(cfgMap *ConfigMap) (*testConsumer, error) {
	consumer, err := NewConsumer(cfgMap)
	if err != nil {
		return nil, err
	}

	tc := &testConsumer{
		consumer: consumer,
	}

	err = consumer.SubscribeTopics([]string{"close-debug"}, func(consumer *Consumer, event Event) error {
		tc.handleEvent(event)
		return nil
	})
	if err != nil {
		return nil, err
	}

	return tc, nil
}

func (c *testConsumer) close() error {
	return c.consumer.Close()
}

func (c *testConsumer) startPoll() {
	for {
		ev := c.consumer.Poll(100)
		if ev == nil {
			continue
		}
		c.handleEvent(ev)
	}
}

func (c *testConsumer) handleEvent(event Event) {
	switch e := event.(type) {
	default:
		fmt.Printf("unhandled event: %+v\n", e)
	case PartitionEOF:

	case AssignedPartitions:
		fmt.Printf("received AssignedPartitions: %+v\n", partitionNumbers(e.Partitions))

		if err := c.consumer.IncrementalAssign(e.Partitions); err != nil {
			fmt.Printf("error on IncrementalAssign: %s\n", err)
			return
		}
		fmt.Printf("handled AssignedPartitions\n")
	case RevokedPartitions:
		fmt.Printf("received RevokedPartitions: %+v\n", partitionNumbers(e.Partitions))

		if err := c.consumer.IncrementalUnassign(e.Partitions); err != nil {
			fmt.Printf("error on IncrementalUnassign: %s\n", err)
			return
		}
		fmt.Printf("handled RevokedPartitions\n")
	case Error:
		fmt.Printf("received error from consumer: %s\n", e.Error())
	}
}

func partitionNumbers(tps []TopicPartition) []int32 {
	values := make([]int32, len(tps))
	for i, tp := range tps {
		values[i] = tp.Partition
	}
	return values
}

If I run this locally, it hangs with the following output

$ go test -timeout 1m -run TestCloseDebugging -v
=== RUN   TestCloseDebugging
Polling
in handleRebalanceEvent
received AssignedPartitions: [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255]
handled AssignedPartitions
Starting close
%4|1650667119.384|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "close-debug": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE

If I remove the "partition.assignment.strategy" configuration setting and change IncrementalAssign/IncrementalUnassign to Assign/Unassign (or remove them entirely) it works as expected

@jliunyu
Copy link
Contributor

jliunyu commented Apr 22, 2022

Thanks for your verification.

If the partition.assignment.strategy configuration setting is removed, it will use the default one range,roundrobin: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
From https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1846, IncrementalAssign/IncrementalUnassign is only for cooperative assignors, such as cooperative-sticky.

You mentioned that if you removed IncrementalAssign/IncrementalUnassign, and the "partition.assignment.strategy" configuration setting, it works. It means the default partition.assignment.strategy (range,roundrobin ) works. If you don't have requirement to use cooperative-sticky, please use the default one. I'm going to take a look the cooperative-sticky one.

BTW, if you don't use channel based consumer, please use the rebalance callback to handle the rebalance, it will be called from the Poll() function. "go.application.rebalance.enable": true, is not needed. Refer to #610

@kevinconaway
Copy link
Contributor

We do indeed want to use cooperative-sticky, thanks

@kevinconaway
Copy link
Contributor

kevinconaway commented Apr 23, 2022

I spent a little more time looking at this and was able to come up with a potential candidate solution

In rd_kafka_cgrp_handle_assign_op, I think the if/else if/else if should be reordered here so that it first checks if the group is in error or terminating. As of now, its hitting the first if block and seeing that the protocol type is cooperative but the assignment method is ASSIGN not one of the INCR values.

I reordered the if/else and tested it locally and it works successfully.

However, the code also occasionally segfaults on close. It also segfaults on close occasionally when running with a normal roundrobin consumer (no cooperative-sticky) and without my changes. This leads me to believe that there is an underlying issue with the changes in #757 (edit, I apologize this was due to my test harness continuing to call Poll after calling Close)

@edenhill
Copy link
Contributor

Good analysis, @kevinconaway!
Would you like to submit a PR for librdkafka to fix the order in assign_op()?

@kevinconaway
Copy link
Contributor

Unfortunately I don't think I have the proficiency in C to write a proper test case to accompany the change. Would you mind if I deferred the change to you?

Thanks.

@kevinconaway
Copy link
Contributor

kevinconaway commented Apr 29, 2022

@edenhill Is this something that you would be able to assist with or is it faster for me to attempt to cobble together a PR? I can certainly change the if order but adding a test might require some assistance if I don't get it right.

@kevinconaway
Copy link
Contributor

Thanks so much for addressing this @edenhill. What is the timeline for a 1.9.0 librdkafka release? If not relatively soon would we be able to backport this to 1.8.x?

@edenhill
Copy link
Contributor

edenhill commented May 3, 2022

If all goes well we should be able to release 1.9.0 next week.

We generally try to avoid backports.

@kevinconaway
Copy link
Contributor

@edenhill Any update on when librdkafka 1.9.0 will be released?

@anatolebeuzon
Copy link

@milindl
Copy link
Contributor

milindl commented Aug 26, 2022

Closing this after testing on the latest release (1.9.2) if any form of (un)assign is permitted during close.

@milindl milindl closed this as completed Aug 26, 2022
GaryWilber added a commit to microsoft/FluidFramework that referenced this issue Dec 1, 2022
Calling `consumer.close()` was hanging indefinitely when the
`optimizedRebalance` config was enabled.

The issue is the same as
confluentinc/confluent-kafka-go#767

The `rebalance` method needs to call `consumer.unassign()` when closing.
It was not doing this because `this.consumer` was being set to
`undefined` before disconnecting.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants