Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set consumer setting max.poll.interval.ms to rest proxy setting of co… #515

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

mmercedes
Copy link

…nsumer.timeout.ms

If consumer.timeout.ms has been set to a value greater than the default value of max.poll.interval.ms and a consumer has set auto.commit.enable=false then it is possible the kafka brokers will consider a consumer as failed and release its partition assignments, while the rest proxy maintains a consumer instance handle. This then leads to an exception on the next call to poll, commitSync, or similar.

This commit sets max.poll.interval.ms equal to consumer.timeout.ms to ensure that kafka will not consider the consumer failed until the rest proxy does as well.

Steps to reproduce the issue :

  • Kafka version 0.10.1.0 or higher (where max.poll.interval.ms is introduced)
  • Set consumer.timeout.ms to a value greater than the max.poll.interval.ms default of 5 minutes
  • Create a consumer via the rest proxy
  • Sleep for a time greater than max.poll.interval.ms but less than consumer.timeout.ms
  • Have consumer attempt to read a record or post an offset

…nsumer.timeout.ms

If `consumer.timeout.ms` has been set to a value greater than the default value of `max.poll.interval.ms` and a consumer has set `auto.commit.enable=false` then it is possible the kafka brokers will consider a consumer as failed and release its partition assignments, while the rest proxy maintains a consumer instance handle. This then leads to an exception on the next call to poll, commitSync, or similar.

This commit sets max.poll.interval.ms equal to consumer.timeout.ms to ensure that kafka will not consider the consumer failed until the rest proxy does as well.
@ghost
Copy link

ghost commented Dec 3, 2018

It looks like @mmercedes hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@mmercedes
Copy link
Author

[clabot:check]

@ghost
Copy link

ghost commented Dec 4, 2018

@confluentinc It looks like @mmercedes just signed our Contributor License Agreement. 👍

Always at your service,

clabot

dont allocate just to call toString()
@mmercedes
Copy link
Author

anyone able to take a look at this?

@bgreenlee
Copy link

Bump

// for max.poll.interval.ms then it is possible the consumer will be
// considered failed by the brokers while it has yet to hit the timeout
// for the rest proxy.
props.setProperty("max.poll.interval.ms", props.getProperty("consumer.timeout.ms", ""));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer should not be considered failed by the brokers if it violates the max.poll.interval.ms. The validation is done in the client side (here: https://github.com/apache/kafka/blob/0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1052)

In the case of the v2 consumer, max.poll.interval.ms should be close to request.timeout.ms. For what it's worth, I don't think we should set this property in the code for the v2 consumer at all. Users could configure it in their proxy config via adding the consumer. prefix to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, consumer.timeout.ms is not something we have in any config nor is it documented anywhere in the proxy. It is part of the old consumer configs (https://kafka.apache.org/20/documentation.html#oldconsumerconfigs)

Copy link
Member

@stanislavkozlovski stanislavkozlovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mmercedes, thank you for the interest in contributing to the project! Unfortunately, I think the changes here do not make much sense. Have you tested them locally and found any improvement?

// for max.poll.interval.ms then it is possible the consumer will be
// considered failed by the brokers while it has yet to hit the timeout
// for the rest proxy.
props.setProperty("max.poll.interval.ms", Integer.toString(iteratorTimeoutMs));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old consumer did not have the max.poll.interval.ms config at all - this line shouldn't have any effect

@mmercedes
Copy link
Author

mmercedes commented Mar 27, 2019

@stanislavkozlovski yes I have tested the issue locally and it resolved the issue I mentioned in my original comment. You can run these steps to reproduce the issue:

Steps to reproduce the issue :

  • Kafka version 0.10.1.0 or higher (where max.poll.interval.ms is introduced)
  • Set consumer.timeout.ms to a value greater than the max.poll.interval.ms default of 5 minutes
  • Create a consumer via the rest proxy
  • Sleep for a time greater than max.poll.interval.ms but less than consumer.timeout.ms
  • Have consumer attempt to read a record or post an offset

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants