aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.08k stars 224 forks source link

High Incoming request sum on Azure Event Hub #990

Open jmd9019 opened 3 months ago

jmd9019 commented 3 months ago

Describe the bug I was able to connect to Azure Event Hub but when I run producer and consumer and connect to a topic I see an Incoming (Sum) and Successful Request (Sum) spike to 4K and stays constantly at 4K even when no message is sent

Expected behavior As there is only 1 pod running producer and consumer there should be fewer Incoming and Successful Request (Sum) because I have one more Kubernetes which is running producer and consumer written in Node.js which hardly consumes 1k Incoming and Successful Request (Sum) which has a minimum of 5 Pods running all the time

Environment (please complete the following information):

Reproducible example

# Add a short Python script or Docker configuration that can reproduce the issue.
Consumer

    async def _subscribe_to_topic(self, topic: str, group_id: str):
        consumer = AIOKafkaConsumer(topic, 
                                    bootstrap_servers=self.server, 
                                    group_id=group_id,
                                    sasl_plain_username='$ConnectionString',
                                    sasl_plain_password=self.kafkaPass,
                                    sasl_mechanism='PLAIN',)
        await consumer.start()
        print("consumer started")

        try:
            async for msg in consumer:
                payload = json.loads(msg.value)
                print(payload)
                try:
                    charging_station_id = payload.get("charging_station_id")
                    await self._process_payload(charging_station_id, payload)
                except Exception as e:
                    logger.error(f"Error processing payload: {e}, payload: {payload}")
        finally:
            await consumer.stop()
            print("consumer stopped")

Producer

    async def _publish(self, topic: str, envelope: Envelope):
        envelope.timestamp = datetime.now(timezone.utc)

        endpoint = os.getenv("PUPSUB_KAFKA_SERVER")
        kafkaPass = os.getenv("PUPSUB_KAFKA_PASSWORD")
        if endpoint is None or kafkaPass is None:
            raise ValueError("PUPSUB_KAFKA_ENDPOINT not set or PUPSUB_KAFKA_PASSWORD not set ")

        producer = AIOKafkaProducer(bootstrap_servers=endpoint,
                                    sasl_mechanism='PLAIN',
                                    sasl_plain_username='$ConnectionString',
                                    sasl_plain_password=kafkaPass,
                                    enable_idempotence=True)
        # Get cluster layout and initial topic/partition leadership information
        await producer.start()
        try:
            # Produce message
            await producer.send_and_wait(topic, envelope.json())
        finally:
            # Wait for all pending messages to be delivered or expire.
            await producer.stop()

Screenshot 2024-03-19 163026