Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka commit problem due to session closure #150

Closed
alok87 opened this issue Mar 2, 2021 · 12 comments
Closed

Kafka commit problem due to session closure #150

alok87 opened this issue Mar 2, 2021 · 12 comments
Labels
bug Something isn't working

Comments

@alok87
Copy link
Contributor

alok87 commented Mar 2, 2021

Loader consumer group takes a lot of time to load based on the size of the batch and based on how loaded the redshift cluster is.

What we have seen when the cluster is resource starved, 100+ topics are being loaded to redshift concurrently and the batch size being operated by the loader is big (lakhs of rows in one load). Then the Commit to Kafka does not happen and the commits keep getting reprocessed.

What does not work

  1. Big batch (msgBuf)
  2. Auto commit = false

What works

  1. Smaller batch (msgBuf)
  2. Auto commit = true (from false, default of loader)

The cause is still not known

        if len(msgBuf) > 0 {
		lastMessage := msgBuf[len(msgBuf)-1]
		b.session.MarkOffset(
			lastMessage.Topic,
			lastMessage.Partition,
			lastMessage.Offset+1,
			"",
		)

		if b.autoCommit == false {
			b.session.Commit()
			klog.V(2).Infof("%s, Committed (autoCommit=false)", lastMessage.Topic)
		}

		b.lastCommittedOffset = lastMessage.Offset

	} else {
		klog.Warningf("%s, markOffset not possible for empty batch", b.topic)
	}

The b.session.Commit() does not have any error to return even though it is synchronous call. So we are not really sure at present why this happens as both marking offset and commit happens but the commit does not.

We are upgrading the redshift cluster as a short term fix.

Long term: discuss and debug to find the root cause.

@alok87 alok87 added the bug Something isn't working label Mar 2, 2021
@alok87
Copy link
Contributor Author

alok87 commented Mar 2, 2021

Default in loader was changed to auto commit true ab820c9

@alok87
Copy link
Contributor Author

alok87 commented Mar 3, 2021

Message commit scenarios tested after upgrading the redshift cluster.

  • Test1: maxSize: 3, autoCommit: true ==> works ✅ (it is = 3*10k messages)
  • Test2: maxSize: 100, autoCommit: true ==> works ✅ (it is = 100*10k messages)
  • Test3: maxSize: 100, autoCommit: false ==> works ✅ (it is = 100*10k messages)

Cannot reproduce.

The weird thing is how is the redshift cluster related to the Kafka commit problem. The only reason i can think of is somehow the kafka consumer session is expiring due to slowness in the resource starved redshift cluster. Since the Commit() of sarama does not return any error, we do not know if any issue happened.

@alok87
Copy link
Contributor Author

alok87 commented Mar 3, 2021

One more observation with autoCommit: false, maxSize: 100

First consumeClaim did Commit(), but the commit did not happen.
When the manager started a fresh consumeClaim and did Commit(), it happened.

@alok87
Copy link
Contributor Author

alok87 commented Mar 3, 2021

Reproduced.

  • Test4: maxSize: 1000, autoCommit: true ==> (it is = 1000*10k messages) ❌
I0303 04:16:16.658142       1 load_processor.go:151] customers, offset: 3367, marking
I0303 04:16:16.658147       1 load_processor.go:158] customers, offset: 3367, marked and commited
I0303 04:16:16.658151       1 load_processor.go:679] customers, batchId:1, startOffset:2367, endOffset:3366: Processed

The next consumer Session starts and the same thing happens, this big batch is not getting committed.

Issue reproduced with auto commit true.

Finally at 04:26 the current consumer group started showing as 3367.

So the problem is the delay in the commit reflecting in Kafka when the batch size is huge. And this happens only when the auto commit is true.

Solution
This is a distributed system problem and the loader code should be resilient.

  • If auto commit is true, we should prevent reprocessing of batch by checking if the last committed offset is not reflecting.
  • Or simply use autoCommit=false and always commit. Loader has the default false so not an issue for it, but batcher has true.

@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Issue was seen to happen with maxSize=400 with autoCommit=true and then was happening with maxSize=400 with autoCommit=false as well.

Deleting one of the three kafka pod and retrying maxSize=3 with autoCommit=false has started moving the offsets...

@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Tracing what happens in sarama's session.Commit(), i saw the following errors

offset_manager.go:340  kafka server: The provided member is not known in the current generation.

Then i found this IBM/sarama#1310 which says you get the generation error when you are trying to commit from a closed session.

This is the reason big batches were not working out.

Increasing the timeouts of kafka consumer would help. Also it should be configurable.

cfg.Consumer.Group.Session.Timeout = 20 * time.Second
cfg.Consumer.Group.Heartbeat.Interval = 6 * time.Second
cfg.Consumer.MaxProcessingTime = 500 * time.Millisecond

(only maxProcessingTime was tried)
Also the sarama logs are turned off by the opeartor which should be turned on to not have to do this tracing.

alok87 added a commit that referenced this issue Mar 4, 2021
New configurations added as based on the type of processing and the size of the batch these values need configurations for every use case. We needed this to process huge batch of loads in Redshift cluster. When the batch size was huge the timeouts where happening leading to commits not going through. Details on why we are doing this is in this issue #150 (comment)
alok87 added a commit that referenced this issue Mar 4, 2021
At present the whole loader will shutdown if session expires for even one routine. We are doing this so that we get to know of the problem fast enough as we get Crashloop alerts on continuous restarts. Later we can change the session timeouts to restart the ConsumeClaim and track the error rate prometheus

#150 (comment)
@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Increasing the sarama's MaxProcessingTime 6ea5b68

This solves this issue.

@alok87 alok87 closed this as completed Mar 4, 2021
@alok87 alok87 reopened this Mar 4, 2021
@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Issue is happening still even after maxProcessing time was increased. It is now happening irrespective of batch size.

@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Deleted the kafka pod things started working! Only to fail again.

@alok87
Copy link
Contributor Author

alok87 commented Mar 4, 2021

Trying retry session 5cc9052 as mentioned in IBM/sarama#1685

@alok87
Copy link
Contributor Author

alok87 commented Mar 5, 2021

Things have been working out well for us after re-establishing sessions. And at present we think sessions were getting closed due to rebalancing happening. As everytime it happens we have seen this errror loop check partition number coroutine will stop to consume.

https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10
Every consumer in a consumer group is assigned one or more topic partitions exclusively, and Rebalance is the re-assignment of partition ownership among consumers.
A Rebalance happens when:
a consumer JOINS the group
a consumer SHUTS DOWN cleanly
a consumer is considered DEAD by the group coordinator. This may happen after a crash or when the consumer is busy with a long-running processing, which means that no heartbeats has been sent in the meanwhile by the consumer to the group coordinator within the configured session interval
new partitions are added

Will keep an eye on which case exactly the rebalance is happening, is it due to slow loader consumers?

@alok87 alok87 changed the title Kafka commit problem Kafka commit problem due to session closure Mar 5, 2021
@alok87
Copy link
Contributor Author

alok87 commented Mar 12, 2021

This was due to this #160

@alok87 alok87 closed this as completed Mar 12, 2021
alok87 added a commit that referenced this issue Jun 5, 2021
New configurations added as based on the type of processing and the size of the batch these values need configurations for every use case. We needed this to process huge batch of loads in Redshift cluster. When the batch size was huge the timeouts where happening leading to commits not going through. Details on why we are doing this is in this issue #150 (comment)
alok87 added a commit that referenced this issue Jun 5, 2021
At present the whole loader will shutdown if session expires for even one routine. We are doing this so that we get to know of the problem fast enough as we get Crashloop alerts on continuous restarts. Later we can change the session timeouts to restart the ConsumeClaim and track the error rate prometheus

#150 (comment)
alok87 added a commit that referenced this issue Jun 7, 2021
New configurations added as based on the type of processing and the size of the batch these values need configurations for every use case. We needed this to process huge batch of loads in Redshift cluster. When the batch size was huge the timeouts where happening leading to commits not going through. Details on why we are doing this is in this issue #150 (comment)
alok87 added a commit that referenced this issue Jun 7, 2021
At present the whole loader will shutdown if session expires for even one routine. We are doing this so that we get to know of the problem fast enough as we get Crashloop alerts on continuous restarts. Later we can change the session timeouts to restart the ConsumeClaim and track the error rate prometheus

#150 (comment)
alok87 added a commit that referenced this issue Jun 17, 2021
New configurations added as based on the type of processing and the size of the batch these values need configurations for every use case. We needed this to process huge batch of loads in Redshift cluster. When the batch size was huge the timeouts where happening leading to commits not going through. Details on why we are doing this is in this issue #150 (comment)
alok87 added a commit that referenced this issue Jun 17, 2021
At present the whole loader will shutdown if session expires for even one routine. We are doing this so that we get to know of the problem fast enough as we get Crashloop alerts on continuous restarts. Later we can change the session timeouts to restart the ConsumeClaim and track the error rate prometheus

#150 (comment)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant