Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High Incoming request sum on Azure Event Hub #990

Open
jmd9019 opened this issue Mar 19, 2024 · 0 comments
Open

High Incoming request sum on Azure Event Hub #990

jmd9019 opened this issue Mar 19, 2024 · 0 comments

Comments

@jmd9019
Copy link

jmd9019 commented Mar 19, 2024

Describe the bug
I was able to connect to Azure Event Hub but when I run producer and consumer and connect to a topic I see an Incoming (Sum) and Successful Request (Sum) spike to 4K and stays constantly at 4K even when no message is sent

Expected behavior
As there is only 1 pod running producer and consumer there should be fewer Incoming and Successful Request (Sum) because I have one more Kubernetes which is running producer and consumer written in Node.js which hardly consumes 1k Incoming and Successful Request (Sum) which has a minimum of 5 Pods running all the time

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): "0.8.1"
  • Kafka Broker version (kafka-topics.sh --version):
  • Other information (Confluent Cloud version, etc.): Azure Event Hub as Kafka

Reproducible example

# Add a short Python script or Docker configuration that can reproduce the issue.
Consumer

    async def _subscribe_to_topic(self, topic: str, group_id: str):
        consumer = AIOKafkaConsumer(topic, 
                                    bootstrap_servers=self.server, 
                                    group_id=group_id,
                                    sasl_plain_username='$ConnectionString',
                                    sasl_plain_password=self.kafkaPass,
                                    sasl_mechanism='PLAIN',)
        await consumer.start()
        print("consumer started")
        
        try:
            async for msg in consumer:
                payload = json.loads(msg.value)
                print(payload)
                try:
                    charging_station_id = payload.get("charging_station_id")
                    await self._process_payload(charging_station_id, payload)
                except Exception as e:
                    logger.error(f"Error processing payload: {e}, payload: {payload}")
        finally:
            await consumer.stop()
            print("consumer stopped")


Producer

    async def _publish(self, topic: str, envelope: Envelope):
        envelope.timestamp = datetime.now(timezone.utc)

        endpoint = os.getenv("PUPSUB_KAFKA_SERVER")
        kafkaPass = os.getenv("PUPSUB_KAFKA_PASSWORD")
        if endpoint is None or kafkaPass is None:
            raise ValueError("PUPSUB_KAFKA_ENDPOINT not set or PUPSUB_KAFKA_PASSWORD not set ")

        producer = AIOKafkaProducer(bootstrap_servers=endpoint,
                                    sasl_mechanism='PLAIN',
                                    sasl_plain_username='$ConnectionString',
                                    sasl_plain_password=kafkaPass,
                                    enable_idempotence=True)
        # Get cluster layout and initial topic/partition leadership information
        await producer.start()
        try:
            # Produce message
            await producer.send_and_wait(topic, envelope.json())
        finally:
            # Wait for all pending messages to be delivered or expire.
            await producer.stop()

Screenshot 2024-03-19 163026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant