Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can I use Azure Event Hubs with aiokafka? #945

Open
junyeong-huray opened this issue Nov 23, 2023 · 1 comment
Open

Can I use Azure Event Hubs with aiokafka? #945

junyeong-huray opened this issue Nov 23, 2023 · 1 comment
Labels

Comments

@junyeong-huray
Copy link

junyeong-huray commented Nov 23, 2023

Hello

First of all, thanks for this nice project!

I am using aiokafka and it has been doing great job for a couple of months in
my project.

Recently I am preparing to use Azure Event Hubs. I considered aiokakfa can
work with Azure Event Hubs out of the box, because Azure Event Hubs says it is
compatible with Kafka. It mentions "Since it supports Apache Kafka, you bring
Kafka workloads to Azure Event Hubs without doing any code change." Wow.

But the thing is... when I am trying producing a message to the Azure Event Hubs
using AIOKafkaProducer, it fails finding coordinator and raises KafkaError.

Code

Here is minimal reproducible example of the problem.

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)


async def test_kafka():
    import aiokafka.helpers
    import certifi
    from aiokafka import AIOKafkaProducer

    ssl_context = aiokafka.helpers.create_ssl_context(cafile=certifi.where())
    producer = AIOKafkaProducer(
        bootstrap_servers='my-kafka-ns.servicebus.windows.net:9093',
        enable_idempotence=True,
        transactional_id='some-id',
        transaction_timeout_ms=20000,
        security_protocol='SASL_SSL',
        sasl_plain_username='$ConnectionString',
        sasl_plain_password=
        'Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=my-shared-access-key',
        sasl_mechanism='PLAIN',
        ssl_context=ssl_context)
    await producer.start()
    async with producer.transaction():
        await producer.send_and_wait('aztest',
                                     b'aztest-data-1234',
                                     key=b'message-key')


asyncio.run(test_kafka())

Log

And here is the debug console log.

root@610947149cd0:/head# poetry run python head/aztest.py 
DEBUG:asyncio:Using selector: EpollSelector
INFO:aiokafka.helpers:Loading SSL CA from /root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/certifi/cacert.pem
DEBUG:aiokafka.producer.producer:Starting the Kafka producer
DEBUG:aiokafka:Attempting to bootstrap via node at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: MetadataRequest_v0(topics=[])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: MetadataResponse_v0(brokers=[(node_id=0, host='my-kafka-ns.servicebus.windows.net', port=9093)], topics=[(error_code=0, topic='aztest', partitions=[(error_code=0, partition=0, leader=0, replicas=[], isr=[])])])
DEBUG:aiokafka.conn:Closing connection at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka:Sending FindCoordinator request for key 309a1400-c988-4446-b8ff-73d9cc1abf6d to broker 0
DEBUG:aiokafka:Initiating connection to node 0 at my-kafka-ns.servicebus.windows.net:9093
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 1: ApiVersionRequest_v0()
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=3, max_version=7), (api_key=1, min_version=4, max_version=6), (api_key=2, min_version=0, max_version=7), (api_key=3, min_version=0, max_version=5), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=8), (api_key=10, min_version=0, max_version=4), (api_key=11, min_version=0, max_version=7), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=5), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=4), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=6), (api_key=20, min_version=0, max_version=2), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=0), (api_key=32, min_version=0, max_version=2), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 3: SaslAuthenticateRequest_v1(sasl_auth_bytes=b'$ConnectionString\x00$ConnectionString\x00Endpoint=sb://my-kafka-ns.servicebus.windows.net/;SharedAccessKeyNam...')
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 3: SaslAuthenticateResponse_v1(error_code=0, error_message=None, sasl_auth_bytes=b'', session_lifetime_ms=0)
INFO:aiokafka.conn:Authenticated as $ConnectionString via PLAIN
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Request 4: FindCoordinatorRequest_v1(coordinator_key='309a1400-c988-4446-b8ff-73d9cc1abf6d', coordinator_type=1)
DEBUG:aiokafka.conn:<AIOKafkaConnection host=my-kafka-ns.servicebus.windows.net port=9093> Response 4: FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
DEBUG:aiokafka:Received group coordinator response FindCoordinatorResponse_v1(throttle_time_ms=0, error_code=42, error_message='FindCoordinator asked for coordinator with type code 1 which is not supported.', coordinator_id=0, host='', port=0)
ERROR:aiokafka.producer.sender:FindCoordinator Request failed: [Error 42] InvalidRequestError
Traceback (most recent call last):
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 198, in _find_coordinator
    coordinator_id = await self.client.coordinator_lookup(
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/client.py", line 682, in coordinator_lookup
    raise err
kafka.errors.InvalidRequestError: [Error 42] InvalidRequestError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/head/head/aztest.py", line 33, in <module>
    asyncio.run(test_kafka())
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/head/head/aztest.py", line 26, in test_kafka
    await producer.start()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 321, in start
    await self._sender.start()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 55, in start
    await self._maybe_wait_for_pid()
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 178, in _maybe_wait_for_pid
    node_id = await self._find_coordinator(
  File "/root/.cache/pypoetry/virtualenvs/head-ai4CaeRi-py3.10/lib/python3.10/site-packages/aiokafka/producer/sender.py", line 213, in _find_coordinator
    raise KafkaError(repr(err))
kafka.errors.KafkaError: KafkaError: InvalidRequestError()
ERROR:asyncio:Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0xffffb6acb190>
root@610947149cd0:/head# 

Can anyone give me some idea to solve this problem?

Thanks,

@junyeong-huray
Copy link
Author

I found it's okay if I don't pass transactional_id to the AIOKafkaProducer.
I guess the error incurs when I am using a kafka transaction feature because
Azure Event Hubs does not provide that feature.

Here is the document of Azure Event Hubs. This document says it lacks of
some Kafka features like transaction, compression and Kafka streams.

https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview#feature-differences-with-apache-kafka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant