Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rebalance listener added to Consumer constructor #845

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

marcosschroh
Copy link

@marcosschroh marcosschroh commented Jul 6, 2022

Changes

Fixes #842

  • Now it is possible to set the ConsumerRebalanceListener when a Consumer instance is created.
  • The validation is made in the SubscriptionState._validate_rebalance_listener method.
  • The RebalanceListenerCT (TypeVar) was created. Using this, we can say that the listener should be a subtype of the abstract ConsumerRebalanceListener
  • sphinx_autodoc_typehints extension was added in order to make possible the TypeVar documentation. If you prefer not to use TypeVar we can remove this and the RebalanceListenerCT
  • Some typing added

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder

aiokafka/abc.py Outdated Show resolved Hide resolved
aiokafka/consumer/consumer.py Show resolved Hide resolved
tests/test_consumer.py Outdated Show resolved Hide resolved
@marcosschroh marcosschroh force-pushed the feat/rebalance-listener-on-consumer-creation branch from 63e93af to 38d896c Compare August 19, 2022 12:19
@lgtm-com
Copy link

lgtm-com bot commented Aug 19, 2022

This pull request introduces 1 alert when merging 38d896c into 6360747 - view on LGTM.com

new alerts:

  • 1 for Missing call to `__init__` during object initialization

@marcosschroh marcosschroh force-pushed the feat/rebalance-listener-on-consumer-creation branch from 38d896c to 126a307 Compare August 19, 2022 12:36
@lgtm-com
Copy link

lgtm-com bot commented Aug 19, 2022

This pull request introduces 1 alert when merging 126a307 into 6360747 - view on LGTM.com

new alerts:

  • 1 for Missing call to `__init__` during object initialization

@tvoinarovskyi
Copy link
Member

Hi @marcosschroh,
Thank you for the contribution, could you elaborate as to why would it be essential to have a listener on creation time? As you can guess most listeners would require the back link to Consumer class, so as per Java's API we modeled the subscribe method to be the only place this can be passed. I feel like changing the API to set "consumer" is not completely backward compatible and I would like to have a good reason to include it. Thanks!

@tvoinarovskyi
Copy link
Member

For example, there can be code, where listener subclasses have "consumer" property, which is not settable.

@marcosschroh
Copy link
Author

marcosschroh commented Aug 19, 2022

Hi @tvoinarovskyi

Thanks for taking a look. I see 2 problems here:

  1. Why do I need to always use the Consumer.subscribe method if I want to use a RebalanceListener? It is a bit weird because I can consume immediately after creating a Consumer. In fact, the Consumer.__init__ method calls the _subscription.subscribe(...) which accepts a RebalanceListener, so it should be possible to set a listener during consumer creation time.

As you said:

most listeners would require the back link to Consumer class

I could see that when I saw the examples and tests. Most of the cases (probably always) you need a link to the consumer from the RebalanceListener instance, which made me realized that is only possible to set a RebalanceListener using the subscribe because the consumer instance is required (chicken/egg). This is leading to the second problem/question:

  1. Looks like the class aiokafka.abc.ConsumerRebalanceListener should have a def __init__(Optional[Consumer]) method. In all the test cases and examples the same code is repeated:
 class RebalanceListener(ConsumerRebalanceListener):

        def __init__(self, consumer):  # Always repeat the same
            self.consumer = consumer
        ...

There we can see that the end user always has to include the __init__(consumer) method.

Another problem (out of this scope) is that the interface defines sync methods, but in examples and test cases always async methods are defined (async def on_partitions_revoked and async def on_partitions_assigned).

This PR adds the def __init__(Optional[consumer] = None): ... to the class aiokafka.abc.ConsumerRebalanceListener, so end users won't have to define the RebalanceListener.__init__(consumer): ... every time . Second, it will be possible to set a listener on consumer creation time as you have a reference to self (Consumer instance).

This change shouldn't affect current users because they are already defining the RebalanceListener.__init__(consumer) and it will made things easier for new users that want to use a RebalanceListener. The consumer property will be always present, regardless whether the listener was set during consumer creation time or using the subscribe method.

Other alternative, could be that the callbacks receive the consumer together with the TopicPartitions:

 class RebalanceListener(ConsumerRebalanceListener):

        async def on_partitions_assigned(self, consumer, tp) -> None
            ...

I hope my explanation makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Why ConsumerRebalanceListener is not expose when create a Consumer?
4 participants