Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Delivering disordered messages during shutdown(rebalance), causing message loss #255

Closed
longquanzheng opened this issue Aug 9, 2018 · 19 comments

Comments

@longquanzheng
Copy link

longquanzheng commented Aug 9, 2018

We are experiencing msg loss during restarting workers, it happens consistently.
But when I added these logs, the message loss disappear.

longquanzheng@0c61134

I think this is because the logging slow down CommitOffsets() function. There must be some race condition with it.
Do you have any idea?

@dim
Copy link
Member

dim commented Aug 10, 2018

@longquanzheng I am currently working on implementing cluster functionality into sarama directly - IBM/sarama#1099. Only a few small bits are missing at this point and I aim to finish those next week.

@dim
Copy link
Member

dim commented Aug 10, 2018

@longquanzheng I am nevertheless happy to fix the race if you can identify how it happens. We have quite a "fuzzy" test in our suite (see https://github.com/bsm/sarama-cluster/blob/master/consumer_test.go#L284) which seems to pass without losing any messages.

@longquanzheng
Copy link
Author

longquanzheng commented Aug 10, 2018

@dim I found that sarama-cluster would deliver messages out of order starting from "Close()" is called to shutdown the library.

Here is an example: worker A owns the partition, consuming 1,2,3,...,9,10, everything is good, and then we call Close() to shutdown, it starts to receive lots of misordered messages, for example it got 100 (jump over 11 ~~ 99).
If worker A commit any of these, then worker B will have to start from 101, which means we have to miss 11~99. However if worker A doesn't commit 100, then worker B can still continue from 11, then we won't miss any messages.

To mitigate this issue, I enforce our consuming process to sleep for 2 seconds before shutdown sarama-cluster, and it works.(uber/cadence@ec218d7 )

But this is not a final solution. We want to understand why sarama-cluster starts to deliver out of order when we call(Close)

Note that this issue only repro in our production when hosts are busy, where we run lots of processes concurrently. We are not able to reproduce it in laptop or some idle hardware.

@longquanzheng longquanzheng changed the title Race condition in CommitOffsets() Delivering disordered messages during shutdown Aug 10, 2018
@longquanzheng longquanzheng changed the title Delivering disordered messages during shutdown Delivering disordered messages during shutdown(rebalance) Aug 10, 2018
@longquanzheng longquanzheng changed the title Delivering disordered messages during shutdown(rebalance) Delivering disordered messages during shutdown(rebalance), causing message loss Aug 10, 2018
@dim
Copy link
Member

dim commented Aug 13, 2018

@longquanzheng from what I know, messages are only ordered per topic/partition. also, Close() doesn't do anything specific i.e. we are just closing the consumer and waiting for exit. finally, the test I have mentioned above is testing this case, i.e. it starts consumers/shuts them down/starts new ones and no messages are being missed, neither locally nor on the CI. I am not sure how to debug this as this doesn't seem to happen neither to us, nor has it been reported by someone else before.

@longquanzheng
Copy link
Author

@dim we finally found that the bug is here: georgeteo/sarama@9618a79

@georgeteo
Copy link
Contributor

Actually, the issue is with the sarama/sarama-cluster contract. Specifically, in https://github.com/bsm/sarama-cluster/blob/master/partitions.go#L89, you use partitonConsumer.Close(), which will drain the Messages channel.

We use the PartitionConsumer abstraction from sarama-cluster, and on Close or rebalance, we continue reading from the Messages channel, but because Sarama is also reading from that channel, we end up with holes in our stream (e.g., we get msg 100, sarama drain takes msg 101, etc). Then due to a race condition in the shutdown procedure, we may commit msg 102 and lose messages.

The proposed fix would be to use AsyncClose in sarama-cluster partitions.go:89.

@georgeteo
Copy link
Contributor

This is the PR with the fix for this: #258.

@georgeteo
Copy link
Contributor

Thanks for accepting the PR. Can you tag a new release as well?

@dim
Copy link
Member

dim commented Aug 16, 2018

@georgeteo looks like you didn't run the tests 😄

@danpmx
Copy link

danpmx commented Aug 16, 2018

I've tried #258 fix today and reverted back.
Looks like something is wrong, consumer is not consuming messages
when listening to Notifications channel, I receive infinite rebalance error messages

@jpiper
Copy link

jpiper commented Aug 16, 2018

By the way #258 worked for me with kafkaConfig.Group.Mode = cluster.ConsumerModePartitions it was only in the default mode that crashed for me

@georgeteo
Copy link
Contributor

@danpmx, @jpiper, I'm unable to reproduce the crash. Can you post your consumer configuration?

When running the following non partition consumer code, I don't see either crashing or infinite rebalance:

func main() {
	config := cluster.NewConfig()
	config.Group.Return.Notifications = true
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	consumer, err := cluster.NewConsumer([]string{"localhost:9092"}, "cg", []string{"test01"}, config)
	if err != nil {
		log.Fatal("unable to start consumer", err)
	}

	for {
		select {
		case msg, ok := <- consumer.Messages():
			if !ok {
				log.Println("ERROR: message channel is closed")
				continue
			}
			log.Printf("INFO: received offset %d from %s-%d\n", msg.Offset, msg.Topic, msg.Partition)
		case err, ok := <- consumer.Errors():
			if !ok {
				log.Println("ERROR: consumer channel is closed")
				continue
			}
			log.Printf("INFO: received error %s from sarama-cluster", err.Error())
		case ntf, ok := <- consumer.Notifications():
			if !ok {
				log.Println("ERROR: notification channel is closed")
				continue
			}
			log.Printf("INFO: received notification %s from sarama-cluster", ntf.Type.String())
		}
	}
}

Two consumer workers:

Worker 1

 ./sarama-test
2018/08/17 10:22:43 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:22:47 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:22:48 INFO: received offset 19 from test01-0
2018/08/17 10:22:48 INFO: received offset 20 from test01-0
2018/08/17 10:22:48 INFO: received offset 21 from test01-0
# Worker 2 join here
2018/08/17 10:22:51 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:22:51 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:23:00 INFO: received offset 22 from test01-0
2018/08/17 10:23:02 INFO: received offset 23 from test01-0
2018/08/17 10:23:03 INFO: received offset 24 from test01-0
2018/08/17 10:23:04 INFO: received offset 25 from test01-0
# Worker 2 leave here. 
2018/08/17 10:23:12 INFO: received notification rebalance start from sarama-cluster
# Worker 1 leaves. 

Worker 2

# worker 1 leaves
./sarama-test
2018/08/17 10:23:10 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:06 INFO: received notification rebalance error from sarama-cluster
2018/08/17 10:24:06 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:09 INFO: received notification rebalance OK from sarama-cluster
2018/08/17 10:24:09 INFO: received offset 26 from test01-0
2018/08/17 10:24:09 INFO: received offset 27 from test01-0
2018/08/17 10:24:09 INFO: received offset 28 from test01-0
2018/08/17 10:24:09 INFO: received offset 29 from test01-0
2018/08/17 10:24:27 INFO: received notification rebalance start from sarama-cluster
2018/08/17 10:24:27 INFO: received notification rebalance OK from sarama-cluster

@dim: do you have any clues why non partition consumer might be broken with my recent change?

@jpiper
Copy link

jpiper commented Aug 17, 2018

@georgeteo I’m using this config

kafkaConfig := cluster.NewConfig()
sarama.MaxResponseSize = 104857600
sarama.MaxRequestSize = 104857600

kafkaConfig.Version = sarama.V1_1_0_0 // Sarama will default to 0.8
kafkaConfig.Group.PartitionStrategy = cluster.StrategyRoundRobin
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Group.Return.Notifications = true
kafkaConfig.ChannelBufferSize = 1000

@venkat1109
Copy link

venkat1109 commented Aug 17, 2018

I am able to repro behavior identical to what @danpmx reported. I digged into this and the root cause appears to be a deadlock in the underlying sarama library (which existed even before the fix added by george). But the new fix caused this deadlock to manifest itself differently i.e. after the fix, its a bunch of rebalance errors; before the fix, the deadlock will lead to consumer not receiving any messages at all. Following is the potential bug I discovered:

  • Setup

    • Topic with two partitions
    • Each partition has about 10k messages
    • partition ChannelBufferSize=32, dwellTimer=1s and maxProcessingTime=250ms
    • Consumer started in multiplexed mode; sleeps 1millis after processing each message. Initially sleeps for a second before consuming the first message
    • I trigger rebalances every 3-4s or so
  • Deadlock

    • Both partition consumers are started by the sarama library
    • Partition consumer blocks for a while and abandons subscription link_1 link_2 because upstream is slow
    • While its in this abandoned state, a rebalance is triggered
    • As part of rebalance, saramacluster.consumer.nextTick() calls release() to release all subs
    • Prior to the fix added by george (current master), this release() call will block forever
    • After the fix added by george, Close() is replaced by AsyncClose(), so release() will finish, but then nextTick.subscribe() will fail with this error. This goes on in a loop and will result in infinite rebalance errors

So, I see two issues now:

  • the message loss originally reported by this issue
  • the deadlock which exist even before this fix

Whoever Owns Sarama: Verify if my analysis above is valid and update this ticket.

@danpmx
Copy link

danpmx commented Aug 19, 2018

@georgeteo our configuration:

conf := cluster.NewConfig()
conf.ChannelBufferSize = 1000
conf.Consumer.Return.Errors = true
conf.Group.Return.Notifications = true
conf.Group.Mode = cluster.ConsumerModePartitions

@imjustfly
Copy link
Contributor

@venkat1109 The release() call will not block, because sarama.PartitionConsumer.Close() starts a goroutine to drain the messages channel. This is why a rebalance will lead to data loss.

rebalance -> release -> PartitionConsumer.Close() -> drain messages -> data loss

@venkat1109
Copy link

@imjustfly - sarama.PartitionConsumer.Close() will block because of this for range loop (and not because of drain, which runs in its own goroutine):
https://github.com/Shopify/sarama/blob/master/consumer.go#L431

i.e. sarama.PartitionConsumer.Close() will return only after child.errors channel is closed.

@imjustfly
Copy link
Contributor

@venkat1109 child.errors is closed:

Close() -> AyncClose() -> close(dying) -> close(trigger) -> close(feeder) -> close(errors)

@venkat1109
Copy link

@imjustfly please see the sequence of steps I described above.

  • You are right that AsyncClose() code path is supposed to close feeder
  • But when upstream is slow, responseFeeder routine will block here (because of expiryTicker) so, feeder will never get closed

vprithvi added a commit to jaegertracing/jaeger that referenced this issue Aug 23, 2018
- Shutdown all partitions before shutting down the sarama consumer. This sidesteps bsm/sarama-cluster#255 and ensures that the shutdown completes in a reasonable timeframe.
- Wait for PartitionConsumer shutdown before consuming messages
- Use Sarama's PartitionConsumer mock instead of relying on our own because it is richer and well tested.
isaachier pushed a commit to isaachier/jaeger that referenced this issue Sep 3, 2018
- Shutdown all partitions before shutting down the sarama consumer. This sidesteps bsm/sarama-cluster#255 and ensures that the shutdown completes in a reasonable timeframe.
- Wait for PartitionConsumer shutdown before consuming messages
- Use Sarama's PartitionConsumer mock instead of relying on our own because it is richer and well tested.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants